You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/17 15:35:26 UTC
[1/2] flink git commit: [FLINK-5932] initializeState() before legacy
state restoring in operator
Repository: flink
Updated Branches:
refs/heads/master 8765f3241 -> 521a53d9a
[FLINK-5932] initializeState() before legacy state restoring in operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d998ff91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d998ff91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d998ff91
Branch: refs/heads/master
Commit: d998ff9175ec84aa5685fbd6ebe79341a4da8e8f
Parents: 8765f32
Author: kl0u <kk...@gmail.com>
Authored: Tue Feb 28 22:31:01 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 17 16:32:40 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumerBase.java | 2 +-
.../streaming/api/operators/AbstractStreamOperator.java | 10 ++++++++--
2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d998ff91/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6858509..d409027 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -574,7 +574,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
- restoredState = restoredOffsets;
+ restoredState = restoredOffsets.isEmpty() ? null : restoredOffsets;
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
http://git-wip-us.apache.org/repos/asf/flink/blob/d998ff91/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ef23be9..b6f86a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -215,8 +215,6 @@ public abstract class AbstractStreamOperator<OUT>
if (restoring) {
- restoreStreamCheckpointed(stateHandles);
-
//pass directly
operatorStateHandlesBackend = stateHandles.getManagedOperatorState();
operatorStateHandlesRaw = stateHandles.getRawOperatorState();
@@ -240,6 +238,14 @@ public abstract class AbstractStreamOperator<OUT>
getContainingTask().getCancelables()); // access to register streams for canceling
initializeState(initializationContext);
+
+ if (restoring) {
+
+ // finally restore the legacy state in case we are
+ // migrating from a previous Flink version.
+
+ restoreStreamCheckpointed(stateHandles);
+ }
}
@Deprecated
[2/2] flink git commit: [FLINK-5846] [cep] Make the CEP operators
backwards compatible with Flink 1.1
Posted by kk...@apache.org.
[FLINK-5846] [cep] Make the CEP operators backwards compatible with Flink 1.1
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/521a53d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/521a53d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/521a53d9
Branch: refs/heads/master
Commit: 521a53d9ad68a3f16a32e08843a6fca2bd4e439d
Parents: d998ff9
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 20 14:51:51 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 17 16:32:41 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 47 ++--
.../AbstractKeyedCEPPatternOperator.java | 85 +++++-
.../flink/cep/operator/CEPOperatorUtils.java | 12 +-
.../cep/operator/KeyedCEPPatternOperator.java | 5 +-
.../TimeoutKeyedCEPPatternOperator.java | 5 +-
.../cep/operator/CEPMigration11to13Test.java | 268 +++++++++++++++++++
.../flink/cep/operator/CEPOperatorTest.java | 6 +-
.../flink/cep/operator/CEPRescalingTest.java | 3 +-
.../src/test/resources/cep-keyed-snapshot-1.1 | Bin 0 -> 5612 bytes
.../test/resources/cep-non-keyed-snapshot-1.1 | Bin 0 -> 3274 bytes
.../MultiplexingStreamRecordSerializer.java | 229 ++++++++++++++++
.../streamrecord/StreamRecordSerializer.java | 150 +++++++++++
pom.xml | 2 +
13 files changed, 769 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 8d87fd8..12afe4e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -119,6 +119,8 @@ public class NFA<T> implements Serializable {
*/
private transient Queue<ComputationState<T>> computationStates;
+ private StateTransitionComparator<T> stateTransitionComparator;
+
public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
@@ -131,6 +133,7 @@ public class NFA<T> implements Serializable {
this.computationStates = new LinkedList<>();
this.states = new HashSet<>();
this.startEventCounter = 1;
+ this.stateTransitionComparator = new StateTransitionComparator<>();
}
public Set<State<T>> getStates() {
@@ -262,22 +265,6 @@ public class NFA<T> implements Serializable {
}
/**
- * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
- */
- private interface StateTransitionComparator<T> extends Comparator<StateTransition<T>>, Serializable {}
- private final Comparator<StateTransition<T>> stateTransitionComparator = new StateTransitionComparator<T>() {
- private static final long serialVersionUID = -2775474935413622278L;
-
- @Override
- public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
- if (o1.getAction() == o2.getAction()) {
- return 0;
- }
- return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
- }
- };
-
- /**
* Computes the next computation states based on the given computation state, the current event,
* its timestamp and the internal state machine.
*
@@ -301,6 +288,13 @@ public class NFA<T> implements Serializable {
State<T> currentState = states.pop();
final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
+ // this is for when we restore from legacy. In that case, the comparator is null
+ // as it did not exist in the previous Flink versions, so we have to initialize it here.
+
+ if (stateTransitionComparator == null) {
+ stateTransitionComparator = new StateTransitionComparator();
+ }
+
// impose the IGNORE will be processed last
Collections.sort(stateTransitions, stateTransitionComparator);
@@ -601,10 +595,7 @@ public class NFA<T> implements Serializable {
ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source));
try {
- @SuppressWarnings("unchecked")
- NFA<T> nfa = null;
- nfa = (NFA<T>) ois.readObject();
- return nfa;
+ return (NFA<T>) ois.readObject();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not deserialize NFA.", e);
}
@@ -637,4 +628,20 @@ public class NFA<T> implements Serializable {
return getClass().hashCode();
}
}
+
+ /**
+ * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
+ */
+ private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
+
+ private static final long serialVersionUID = -2775474935413622278L;
+
+ @Override
+ public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
+ if (o1.getAction() == o2.getAction()) {
+ return 0;
+ }
+ return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index de7daea..b6d57cd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -25,19 +25,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.api.operators.InternalWatermarkCallbackService;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OnWatermarkCallback;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -55,7 +61,7 @@ import java.util.PriorityQueue;
*/
public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT> {
+ implements OneInputStreamOperator<IN, OUT>, CheckpointedRestoringOperator {
private static final long serialVersionUID = -4166778210774160757L;
@@ -82,18 +88,26 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
private final NFACompiler.NFAFactory<IN> nfaFactory;
+ /**
+ * A flag used in the case of migration that indicates if
+ * we are restoring from an old keyed or non-keyed operator.
+ */
+ private final boolean migratingFromOldKeyedOperator;
+
public AbstractKeyedCEPPatternOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
final KeySelector<IN, KEY> keySelector,
final TypeSerializer<KEY> keySerializer,
- final NFACompiler.NFAFactory<IN> nfaFactory) {
+ final NFACompiler.NFAFactory<IN> nfaFactory,
+ final boolean migratingFromOldKeyedOperator) {
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
this.keySelector = Preconditions.checkNotNull(keySelector);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+ this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
}
public TypeSerializer<IN> getInputSerializer() {
@@ -104,15 +118,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
- // we have to call initializeState here and in the migration restore()
- // method because the restore() (from legacy) is called before the
- // initializeState().
-
- initializeState();
- }
-
- private void initializeState() {
-
if (nfaOperatorState == null) {
nfaOperatorState = getRuntimeContext().getState(
new ValueStateDescriptor<>(
@@ -252,6 +257,57 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
*/
protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
+ ////////////////////// Backwards Compatibility //////////////////////
+
+ @Override
+ public void restoreState(FSDataInputStream in) throws Exception {
+ // this is the flag indicating if we have udf
+ // state to restore (not needed here)
+ in.read();
+
+ DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(in);
+ InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService();
+
+ if (migratingFromOldKeyedOperator) {
+ int numberEntries = inputView.readInt();
+ for (int i = 0; i <numberEntries; i++) {
+ watermarkCallbackService.registerKeyForWatermarkCallback(keySerializer.deserialize(inputView));
+ }
+ } else {
+
+ final ObjectInputStream ois = new ObjectInputStream(in);
+
+ // retrieve the NFA
+ @SuppressWarnings("unchecked")
+ NFA<IN> nfa = (NFA<IN>) ois.readObject();
+
+ // retrieve the elements that were pending in the priority queue
+ MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+ PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue();
+ int entries = ois.readInt();
+ for (int i = 0; i < entries; i++) {
+ StreamElement streamElement = recordSerializer.deserialize(inputView);
+ priorityQueue.offer(streamElement.<IN>asRecord());
+ }
+
+ // finally register the retrieved state with the new keyed state.
+ setCurrentKey((byte) 0);
+ nfaOperatorState.update(nfa);
+ priorityQueueOperatorState.update(priorityQueue);
+
+ if (!isProcessingTime) {
+ // this is relevant only for event/ingestion time
+
+ // need to work around type restrictions
+ InternalWatermarkCallbackService rawWatermarkCallbackService =
+ (InternalWatermarkCallbackService) watermarkCallbackService;
+
+ rawWatermarkCallbackService.registerKeyForWatermarkCallback((byte) 0);
+ }
+ ois.close();
+ }
+ }
+
////////////////////// Utility Classes //////////////////////
/**
@@ -348,6 +404,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
}
@Override
+ public boolean isCompatibleWith(TypeSerializer<?> other) {
+ return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj instanceof PriorityQueueSerializer) {
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 56ecb17..a5eef45 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -72,7 +72,8 @@ public class CEPOperatorUtils {
isProcessingTime,
keySelector,
keySerializer,
- nfaFactory));
+ nfaFactory,
+ true));
} else {
KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
@@ -86,7 +87,8 @@ public class CEPOperatorUtils {
isProcessingTime,
keySelector,
keySerializer,
- nfaFactory
+ nfaFactory,
+ false
)).forceNonParallel();
}
@@ -133,7 +135,8 @@ public class CEPOperatorUtils {
isProcessingTime,
keySelector,
keySerializer,
- nfaFactory));
+ nfaFactory,
+ true));
} else {
KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
@@ -147,7 +150,8 @@ public class CEPOperatorUtils {
isProcessingTime,
keySelector,
keySerializer,
- nfaFactory
+ nfaFactory,
+ false
)).forceNonParallel();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 5b6ffe2..21cee23 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
boolean isProcessingTime,
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
- NFACompiler.NFAFactory<IN> nfaFactory) {
+ NFACompiler.NFAFactory<IN> nfaFactory,
+ boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 6889bb9..c6fba55 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
boolean isProcessingTime,
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
- NFACompiler.NFAFactory<IN> nfaFactory) {
+ NFACompiler.NFAFactory<IN> nfaFactory,
+ boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
new file mode 100644
index 0000000..5a3e623
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration11to13Test {
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ if (resource == null) {
+ throw new NullPointerException("Missing snapshot resource.");
+ }
+ return resource.getFile();
+ }
+
+ @Test
+ public void testKeyedCEPOperatorMigratation() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.1
+ /*
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()));
+ harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+ harness.open();
+ harness.processElement(new StreamRecord<Event>(startEvent, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processWatermark(new Watermark(2));
+ // simulate snapshot/restore with empty element queue but NFA state
+ StreamTaskState snapshot = harness.snapshot(1, 1);
+ FileOutputStream out = new FileOutputStream(
+ "src/test/resources/cep-keyed-snapshot-1.1");
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(snapshot);
+ out.close();
+ harness.close();
+ */
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
+ harness.open();
+
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+ assertEquals(startEvent, patternMap.get("start"));
+ assertEquals(middleEvent, patternMap.get("middle"));
+ assertEquals(endEvent, patternMap.get("end"));
+
+ harness.close();
+ }
+
+ @Test
+ public void testNonKeyedCEPFunctionMigration() throws Exception {
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+ final Event endEvent= new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.1
+ /*
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+ new CEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ new NFAFactory()));
+ harness.open();
+ harness.processElement(new StreamRecord<Event>(startEvent, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processWatermark(new Watermark(2));
+
+ // simulate snapshot/restore with empty element queue but NFA state
+ StreamTaskState snapshot = harness.snapshot(1, 1);
+ FileOutputStream out = new FileOutputStream(
+ "src/test/resources/cep-non-keyed-snapshot-1.1");
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(snapshot);
+ out.close();
+ harness.close();
+ */
+
+ NullByteKeySelector keySelector = new NullByteKeySelector();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ ByteSerializer.INSTANCE,
+ new NFAFactory(),
+ false),
+ keySelector,
+ BasicTypeInfo.BYTE_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1"));
+ harness.open();
+
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
+
+ harness.processWatermark(new Watermark(Long.MAX_VALUE));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+ assertEquals(startEvent, patternMap.get("start"));
+ assertEquals(middleEvent, patternMap.get("middle"));
+ assertEquals(endEvent, patternMap.get("end"));
+
+ harness.close();
+ }
+
+ private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private NFAFactory() {
+ this(false);
+ }
+
+ private NFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter())
+ .followedBy("end").where(new EndFilter())
+ // add a window timeout to test whether timestamps of elements in the
+ // priority queue in CEP operator are correctly checkpointed/restored
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class StartFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }
+
+ private static class MiddleFilter implements FilterFunction<SubEvent> {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getVolume() > 5.0;
+ }
+ }
+
+ private static class EndFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4ae74b9..a99db05 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -224,7 +224,8 @@ public class CEPOperatorTest extends TestLogger {
false,
keySelector,
IntSerializer.INSTANCE,
- new NFAFactory(true)),
+ new NFAFactory(true),
+ true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -480,7 +481,8 @@ public class CEPOperatorTest extends TestLogger {
isProcessingTime,
keySelector,
IntSerializer.INSTANCE,
- new NFAFactory());
+ new NFAFactory(),
+ true);
}
private static class TestKeySelector implements KeySelector<Event, Integer> {
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 78765c0..399662a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -345,7 +345,8 @@ public class CEPRescalingTest {
false,
keySelector,
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
- new NFAFactory()),
+ new NFAFactory(),
+ true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO,
maxParallelism,
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
new file mode 100644
index 0000000..277de1d
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
new file mode 100644
index 0000000..b5ca51e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..a0f5a60
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
+
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int TAG_REC_WITH_TIMESTAMP = 0;
+ private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+ private static final int TAG_WATERMARK = 2;
+
+
+ private final TypeSerializer<T> typeSerializer;
+
+
+ public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
+ if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
+ throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+ }
+ this.typeSerializer = requireNonNull(serializer);
+ }
+
+ public TypeSerializer<T> getContainedTypeSerializer() {
+ return this.typeSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public MultiplexingStreamRecordSerializer<T> duplicate() {
+ TypeSerializer<T> copy = typeSerializer.duplicate();
+ return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamRecord<T> createInstance() {
+ return new StreamRecord<T>(typeSerializer.createInstance());
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public StreamElement copy(StreamElement from) {
+ // we can reuse the timestamp since Instant is immutable
+ if (from.isRecord()) {
+ StreamRecord<T> fromRecord = from.asRecord();
+ return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
+ }
+ else if (from.isWatermark()) {
+ // is immutable
+ return from;
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public StreamElement copy(StreamElement from, StreamElement reuse) {
+ if (from.isRecord() && reuse.isRecord()) {
+ StreamRecord<T> fromRecord = from.asRecord();
+ StreamRecord<T> reuseRecord = reuse.asRecord();
+
+ T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+ fromRecord.copyTo(valueCopy, reuseRecord);
+ return reuse;
+ }
+ else if (from.isWatermark()) {
+ // is immutable
+ return from;
+ }
+ else {
+ throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
+ }
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int tag = source.readByte();
+ target.write(tag);
+
+ if (tag == TAG_REC_WITH_TIMESTAMP) {
+ // move timestamp
+ target.writeLong(source.readLong());
+ typeSerializer.copy(source, target);
+ }
+ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ typeSerializer.copy(source, target);
+ }
+ else if (tag == TAG_WATERMARK) {
+ target.writeLong(source.readLong());
+ }
+ else {
+ throw new IOException("Corrupt stream, found tag: " + tag);
+ }
+ }
+
+ @Override
+ public void serialize(StreamElement value, DataOutputView target) throws IOException {
+ if (value.isRecord()) {
+ StreamRecord<T> record = value.asRecord();
+
+ if (record.hasTimestamp()) {
+ target.write(TAG_REC_WITH_TIMESTAMP);
+ target.writeLong(record.getTimestamp());
+ } else {
+ target.write(TAG_REC_WITHOUT_TIMESTAMP);
+ }
+ typeSerializer.serialize(record.getValue(), target);
+ }
+ else if (value.isWatermark()) {
+ target.write(TAG_WATERMARK);
+ target.writeLong(value.asWatermark().getTimestamp());
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public StreamElement deserialize(DataInputView source) throws IOException {
+ int tag = source.readByte();
+ if (tag == TAG_REC_WITH_TIMESTAMP) {
+ long timestamp = source.readLong();
+ return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
+ }
+ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ return new StreamRecord<T>(typeSerializer.deserialize(source));
+ }
+ else if (tag == TAG_WATERMARK) {
+ return new Watermark(source.readLong());
+ }
+ else {
+ throw new IOException("Corrupt stream, found tag: " + tag);
+ }
+ }
+
+ @Override
+ public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
+ int tag = source.readByte();
+ if (tag == TAG_REC_WITH_TIMESTAMP) {
+ long timestamp = source.readLong();
+ T value = typeSerializer.deserialize(source);
+ StreamRecord<T> reuseRecord = reuse.asRecord();
+ reuseRecord.replace(value, timestamp);
+ return reuseRecord;
+ }
+ else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ T value = typeSerializer.deserialize(source);
+ StreamRecord<T> reuseRecord = reuse.asRecord();
+ reuseRecord.replace(value);
+ return reuseRecord;
+ }
+ else if (tag == TAG_WATERMARK) {
+ return new Watermark(source.readLong());
+ }
+ else {
+ throw new IOException("Corrupt stream, found tag: " + tag);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MultiplexingStreamRecordSerializer) {
+ MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
+
+ return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof MultiplexingStreamRecordSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeSerializer.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
new file mode 100644
index 0000000..0235ab8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+@Internal
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T> typeSerializer;
+
+
+ public StreamRecordSerializer(TypeSerializer<T> serializer) {
+ if (serializer instanceof StreamRecordSerializer) {
+ throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+ }
+ this.typeSerializer = Preconditions.checkNotNull(serializer);
+ }
+
+ public TypeSerializer<T> getContainedTypeSerializer() {
+ return this.typeSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // General serializer and type utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamRecordSerializer<T> duplicate() {
+ TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+ return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public int getLength() {
+ return typeSerializer.getLength();
+ }
+
+ // ------------------------------------------------------------------------
+ // Type serialization, copying, instantiation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamRecord<T> createInstance() {
+ try {
+ return new StreamRecord<T>(typeSerializer.createInstance());
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot instantiate StreamRecord.", e);
+ }
+ }
+
+ @Override
+ public StreamRecord<T> copy(StreamRecord<T> from) {
+ return from.copy(typeSerializer.copy(from.getValue()));
+ }
+
+ @Override
+ public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+ from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse);
+ return reuse;
+ }
+
+ @Override
+ public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+ typeSerializer.serialize(value.getValue(), target);
+ }
+
+ @Override
+ public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+ return new StreamRecord<T>(typeSerializer.deserialize(source));
+ }
+
+ @Override
+ public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+ T element = typeSerializer.deserialize(reuse.getValue(), source);
+ reuse.replace(element);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ typeSerializer.copy(source, target);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StreamRecordSerializer) {
+ StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
+
+ return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StreamRecordSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeSerializer.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 661a72d..538aa4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -911,6 +911,8 @@ under the License.
<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude>
<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude>
<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude>
+ <exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude>
+ <exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude>
<!-- TweetInputFormat Test Data-->
<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>