You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/08/24 18:22:38 UTC
[07/11] flink git commit: [FLINK-7461] Remove Backwards compatibility
with <= Flink 1.1
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
deleted file mode 100644
index 16f3769..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.MemValueState;
-import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation")
-public class MigrationV0ToV1Test {
-
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
-
- /**
- * Simple test of savepoint methods.
- */
- @Test
- public void testSavepointMigrationV0ToV1() throws Exception {
-
- String target = tmp.getRoot().getAbsolutePath();
-
- assertEquals(0, tmp.getRoot().listFiles().length);
-
- long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
- int numTaskStates = 4;
- int numSubtaskStates = 16;
-
- Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected =
- createTaskStatesOld(numTaskStates, numSubtaskStates);
-
- SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
- assertEquals(SavepointV0.VERSION, savepoint.getVersion());
- assertEquals(checkpointId, savepoint.getCheckpointId());
- assertEquals(expected, savepoint.getOldTaskStates());
-
- assertFalse(savepoint.getOldTaskStates().isEmpty());
-
- Exception latestException = null;
- Path path = null;
- FSDataOutputStream fdos = null;
-
- FileSystem fs = null;
-
- try {
-
- // Try to create a FS output stream
- for (int attempt = 0; attempt < 10; attempt++) {
- path = new Path(target, FileUtils.getRandomFilename("savepoint-"));
-
- if (fs == null) {
- fs = FileSystem.get(path.toUri());
- }
-
- try {
- fdos = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
- break;
- } catch (Exception e) {
- latestException = e;
- }
- }
-
- if (fdos == null) {
- throw new IOException("Failed to create file output stream at " + path, latestException);
- }
-
- try (DataOutputStream dos = new DataOutputStream(fdos)) {
- dos.writeInt(SavepointStore.MAGIC_NUMBER);
- dos.writeInt(savepoint.getVersion());
- SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
- }
-
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
- Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl);
- int t = 0;
- for (TaskState taskState : sp.getTaskStates()) {
- for (int p = 0; p < taskState.getParallelism(); ++p) {
- SubtaskState subtaskState = taskState.getState(p);
- ChainedStateHandle<StreamStateHandle> legacyOperatorState = subtaskState.getLegacyOperatorState();
- for (int c = 0; c < legacyOperatorState.getLength(); ++c) {
- StreamStateHandle stateHandle = legacyOperatorState.get(c);
- try (InputStream is = stateHandle.openInputStream()) {
- Tuple4<Integer, Integer, Integer, Integer> expTestState = new Tuple4<>(0, t, p, c);
- Tuple4<Integer, Integer, Integer, Integer> actTestState;
- //check function state
- if (p % 4 != 0) {
- assertEquals(1, is.read());
- actTestState = InstantiationUtil.deserializeObject(is, cl);
- assertEquals(expTestState, actTestState);
- } else {
- assertEquals(0, is.read());
- }
-
- //check operator state
- expTestState.f0 = 1;
- actTestState = InstantiationUtil.deserializeObject(is, cl);
- assertEquals(expTestState, actTestState);
- }
- }
-
- //check keyed state
- KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState();
-
- if (t % 3 != 0) {
-
- assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle);
-
- KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
-
- assertEquals(1, keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
- assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
-
- ByteStreamStateHandle stateHandle =
- (ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle();
- HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState =
- MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl);
-
- assertEquals(2, testKeyedState.size());
- for (KvStateSnapshot<?, ?, ?, ?> snapshot : testKeyedState.values()) {
- MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?, ?, ?>) snapshot;
- byte[] data = castedSnapshot.getData();
- assertEquals(t, data[0]);
- assertEquals(p, data[1]);
- }
- } else {
- assertEquals(null, keyedStateHandle);
- }
- }
-
- ++t;
- }
-
- savepoint.dispose();
-
- } finally {
- // Dispose
- SavepointStore.removeSavepointFile(path.toString());
- }
- }
-
- private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> createTaskStatesOld(
- int numTaskStates, int numSubtaskStates) throws Exception {
-
- List<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- org.apache.flink.migration.runtime.checkpoint.TaskState taskState =
- new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates);
- for (int j = 0; j < numSubtaskStates; j++) {
-
- StreamTaskState[] streamTaskStates = new StreamTaskState[2];
-
- for (int k = 0; k < streamTaskStates.length; k++) {
- StreamTaskState state = new StreamTaskState();
- Tuple4<Integer, Integer, Integer, Integer> testState = new Tuple4<>(0, i, j, k);
- if (j % 4 != 0) {
- state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
- }
- testState = new Tuple4<>(1, i, j, k);
- state.setOperatorState(new SerializedStateHandle<>(testState));
-
- if ((0 == k) && (i % 3 != 0)) {
- HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
- for (int l = 0; l < 2; ++l) {
- String name = "keyed-" + l;
- KvStateSnapshot<?, ?, ?, ?> testKeyedSnapshot =
- new MemValueState.Snapshot<>(
- IntSerializer.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- IntSerializer.INSTANCE,
- new ValueStateDescriptor<>(name, Integer.class, 0),
- new byte[]{(byte) i, (byte) j});
- testKeyedState.put(name, testKeyedSnapshot);
- }
- state.setKvStates(testKeyedState);
- }
- streamTaskStates[k] = state;
- }
-
- StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates);
- org.apache.flink.migration.util.SerializedValue<
- org.apache.flink.migration.runtime.state.StateHandle<?>> handle =
- new org.apache.flink.migration.util.SerializedValue<
- org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList);
-
- taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0));
- }
-
- taskStates.add(taskState);
- }
-
- return taskStates;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 933c7a0..173730a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
@@ -98,6 +100,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -560,7 +563,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
TaskStateSnapshot taskStateHandles) throws Exception {
int subtaskIndex = getIndexInSubtaskGroup();
if (subtaskIndex < recoveredStates.length) {
- try (FSDataInputStream in = taskStateHandles.getSubtaskStateMappings().iterator().next().getValue().getLegacyOperatorState().openInputStream()) {
+ OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(taskStateHandles);
+ try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
}
}
@@ -572,11 +576,21 @@ public class JobManagerHARecoveryTest extends TestLogger {
String.valueOf(UUID.randomUUID()),
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
+ Map<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<>(1);
+ stateNameToPartitionOffsets.put(
+ "test-state",
+ new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+
+ OperatorStateHandle operatorStateHandle = new OperatorStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle);
+
TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
checkpointStateHandles.putSubtaskStateByOperatorID(
OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
- new OperatorSubtaskState(byteStreamStateHandle)
- );
+ new OperatorSubtaskState(
+ Collections.singletonList(operatorStateHandle),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()));
getEnvironment().acknowledgeCheckpoint(
checkpointMetaData.getCheckpointId(),
@@ -614,5 +628,17 @@ public class JobManagerHARecoveryTest extends TestLogger {
public static long[] getRecoveredStates() {
return recoveredStates;
}
+
+ private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
+ Set<Map.Entry<OperatorID, OperatorSubtaskState>> subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
+ Preconditions.checkNotNull(subtaskStateMappings);
+ Preconditions.checkState(subtaskStateMappings.size() == 1);
+ OperatorSubtaskState subtaskState = subtaskStateMappings.iterator().next().getValue();
+ Collection<OperatorStateHandle> managedOperatorState =
+ Preconditions.checkNotNull(subtaskState).getManagedOperatorState();
+ Preconditions.checkNotNull(managedOperatorState);
+ Preconditions.checkState(managedOperatorState.size() == 1);
+ return managedOperatorState.iterator().next();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index d022cdc..b36ac86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -75,7 +75,6 @@ public class CheckpointMessagesTest {
checkpointStateHandles.putSubtaskStateByOperatorID(
new OperatorID(),
new OperatorSubtaskState(
- CheckpointCoordinatorTest.generateStreamStateHandle(new MyHandle()),
CheckpointCoordinatorTest.generatePartitionableStateHandle(new JobVertexID(), 0, 2, 8, false),
null,
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, Collections.singletonList(new MyHandle())),
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
deleted file mode 100644
index dd6148c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.api.graph;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphHasher;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-
-import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hasher;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.StringUtils.byteToHexString;
-
-/**
- * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with
- * future Flink versions.
- *
- * <p>DO NOT MODIFY THIS CLASS
- */
-public class StreamGraphHasherV1 implements StreamGraphHasher {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV1.class);
-
- @Override
- public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
- // The hash function used to generate the hash
- final HashFunction hashFunction = Hashing.murmur3_128(0);
- final Map<Integer, byte[]> hashes = new HashMap<>();
-
- Set<Integer> visited = new HashSet<>();
- Queue<StreamNode> remaining = new ArrayDeque<>();
-
- // We need to make the source order deterministic. The source IDs are
- // not returned in the same order, which means that submitting the same
- // program twice might result in different traversal, which breaks the
- // deterministic hash assignment.
- List<Integer> sources = new ArrayList<>();
- for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
- sources.add(sourceNodeId);
- }
- Collections.sort(sources);
-
- //
- // Traverse the graph in a breadth-first manner. Keep in mind that
- // the graph is not a tree and multiple paths to nodes can exist.
- //
-
- // Start with source nodes
- for (Integer sourceNodeId : sources) {
- remaining.add(streamGraph.getStreamNode(sourceNodeId));
- visited.add(sourceNodeId);
- }
-
- StreamNode currentNode;
- while ((currentNode = remaining.poll()) != null) {
- // Generate the hash code. Because multiple path exist to each
- // node, we might not have all required inputs available to
- // generate the hash code.
- if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
- // Add the child nodes
- for (StreamEdge outEdge : currentNode.getOutEdges()) {
- StreamNode child = outEdge.getTargetVertex();
-
- if (!visited.contains(child.getId())) {
- remaining.add(child);
- visited.add(child.getId());
- }
- }
- } else {
- // We will revisit this later.
- visited.remove(currentNode.getId());
- }
- }
-
- return hashes;
- }
-
- /**
- * Generates a hash for the node and returns whether the operation was
- * successful.
- *
- * @param node The node to generate the hash for
- * @param hashFunction The hash function to use
- * @param hashes The current state of generated hashes
- * @return <code>true</code> if the node hash has been generated.
- * <code>false</code>, otherwise. If the operation is not successful, the
- * hash needs be generated at a later point when all input is available.
- * @throws IllegalStateException If node has user-specified hash and is
- * intermediate node of a chain
- */
- private boolean generateNodeHash(
- StreamNode node,
- HashFunction hashFunction,
- Map<Integer, byte[]> hashes,
- boolean isChainingEnabled) {
-
- // Check for user-specified ID
- String userSpecifiedHash = node.getTransformationUID();
-
- if (userSpecifiedHash == null) {
- // Check that all input nodes have their hashes computed
- for (StreamEdge inEdge : node.getInEdges()) {
- // If the input node has not been visited yet, the current
- // node will be visited again at a later point when all input
- // nodes have been visited and their hashes set.
- if (!hashes.containsKey(inEdge.getSourceId())) {
- return false;
- }
- }
-
- Hasher hasher = hashFunction.newHasher();
- byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
-
- if (hashes.put(node.getId(), hash) != null) {
- // Sanity check
- throw new IllegalStateException("Unexpected state. Tried to add node hash " +
- "twice. This is probably a bug in the JobGraph generator.");
- }
-
- return true;
- } else {
- Hasher hasher = hashFunction.newHasher();
- byte[] hash = generateUserSpecifiedHash(node, hasher);
-
- for (byte[] previousHash : hashes.values()) {
- if (Arrays.equals(previousHash, hash)) {
- throw new IllegalArgumentException("Hash collision on user-specified ID. " +
- "Most likely cause is a non-unique ID. Please check that all IDs " +
- "specified via `uid(String)` are unique.");
- }
- }
-
- if (hashes.put(node.getId(), hash) != null) {
- // Sanity check
- throw new IllegalStateException("Unexpected state. Tried to add node hash " +
- "twice. This is probably a bug in the JobGraph generator.");
- }
-
- return true;
- }
- }
-
- /**
- * Generates a hash from a user-specified ID.
- */
- private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
- hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
-
- return hasher.hash().asBytes();
- }
-
- /**
- * Generates a deterministic hash from node-local properties and input and
- * output edges.
- */
- private byte[] generateDeterministicHash(
- StreamNode node,
- Hasher hasher,
- Map<Integer, byte[]> hashes,
- boolean isChainingEnabled) {
-
- // Include stream node to hash. We use the current size of the computed
- // hashes as the ID. We cannot use the node's ID, because it is
- // assigned from a static counter. This will result in two identical
- // programs having different hashes.
- generateNodeLocalHash(node, hasher, hashes.size());
-
- // Include chained nodes to hash
- for (StreamEdge outEdge : node.getOutEdges()) {
- if (isChainable(outEdge, isChainingEnabled)) {
- StreamNode chainedNode = outEdge.getTargetVertex();
-
- // Use the hash size again, because the nodes are chained to
- // this node. This does not add a hash for the chained nodes.
- generateNodeLocalHash(chainedNode, hasher, hashes.size());
- }
- }
-
- byte[] hash = hasher.hash().asBytes();
-
- // Make sure that all input nodes have their hash set before entering
- // this loop (calling this method).
- for (StreamEdge inEdge : node.getInEdges()) {
- byte[] otherHash = hashes.get(inEdge.getSourceId());
-
- // Sanity check
- if (otherHash == null) {
- throw new IllegalStateException("Missing hash for input node "
- + inEdge.getSourceVertex() + ". Cannot generate hash for "
- + node + ".");
- }
-
- for (int j = 0; j < hash.length; j++) {
- hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
- }
- }
-
- if (LOG.isDebugEnabled()) {
- String udfClassName = "";
- if (node.getOperator() instanceof AbstractUdfStreamOperator) {
- udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
- .getUserFunction().getClass().getName();
- }
-
- LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
- "'" + node.toString() + "' {id: " + node.getId() + ", " +
- "parallelism: " + node.getParallelism() + ", " +
- "user function: " + udfClassName + "}");
- }
-
- return hash;
- }
-
- private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
- StreamNode upStreamVertex = edge.getSourceVertex();
- StreamNode downStreamVertex = edge.getTargetVertex();
-
- StreamOperator<?> headOperator = upStreamVertex.getOperator();
- StreamOperator<?> outOperator = downStreamVertex.getOperator();
-
- return downStreamVertex.getInEdges().size() == 1
- && outOperator != null
- && headOperator != null
- && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
- && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
- && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
- headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
- && (edge.getPartitioner() instanceof ForwardPartitioner)
- && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
- && isChainingEnabled;
- }
-
- private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
- hasher.putInt(id);
-
- hasher.putInt(node.getParallelism());
-
- if (node.getOperator() instanceof AbstractUdfStreamOperator) {
- String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
- .getUserFunction().getClass().getName();
-
- hasher.putString(udfClassName, Charset.forName("UTF-8"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index b1471b2..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Legacy multiplexing {@link TypeSerializer} for stream records, watermarks and other stream
- * elements.
- */
-public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
-
-
- private static final long serialVersionUID = 1L;
-
- private static final int TAG_REC_WITH_TIMESTAMP = 0;
- private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
- private static final int TAG_WATERMARK = 2;
-
-
- private final TypeSerializer<T> typeSerializer;
-
- public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
- if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
- throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
- }
- this.typeSerializer = requireNonNull(serializer);
- }
-
- public TypeSerializer<T> getContainedTypeSerializer() {
- return this.typeSerializer;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public MultiplexingStreamRecordSerializer<T> duplicate() {
- TypeSerializer<T> copy = typeSerializer.duplicate();
- return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public StreamRecord<T> createInstance() {
- return new StreamRecord<T>(typeSerializer.createInstance());
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public StreamElement copy(StreamElement from) {
- // we can reuse the timestamp since Instant is immutable
- if (from.isRecord()) {
- StreamRecord<T> fromRecord = from.asRecord();
- return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
- }
- else if (from.isWatermark()) {
- // is immutable
- return from;
- }
- else {
- throw new RuntimeException();
- }
- }
-
- @Override
- public StreamElement copy(StreamElement from, StreamElement reuse) {
- if (from.isRecord() && reuse.isRecord()) {
- StreamRecord<T> fromRecord = from.asRecord();
- StreamRecord<T> reuseRecord = reuse.asRecord();
-
- T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
- fromRecord.copyTo(valueCopy, reuseRecord);
- return reuse;
- }
- else if (from.isWatermark()) {
- // is immutable
- return from;
- }
- else {
- throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
- }
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- int tag = source.readByte();
- target.write(tag);
-
- if (tag == TAG_REC_WITH_TIMESTAMP) {
- // move timestamp
- target.writeLong(source.readLong());
- typeSerializer.copy(source, target);
- }
- else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
- typeSerializer.copy(source, target);
- }
- else if (tag == TAG_WATERMARK) {
- target.writeLong(source.readLong());
- }
- else {
- throw new IOException("Corrupt stream, found tag: " + tag);
- }
- }
-
- @Override
- public void serialize(StreamElement value, DataOutputView target) throws IOException {
- if (value.isRecord()) {
- StreamRecord<T> record = value.asRecord();
-
- if (record.hasTimestamp()) {
- target.write(TAG_REC_WITH_TIMESTAMP);
- target.writeLong(record.getTimestamp());
- } else {
- target.write(TAG_REC_WITHOUT_TIMESTAMP);
- }
- typeSerializer.serialize(record.getValue(), target);
- }
- else if (value.isWatermark()) {
- target.write(TAG_WATERMARK);
- target.writeLong(value.asWatermark().getTimestamp());
- }
- else {
- throw new RuntimeException();
- }
- }
-
- @Override
- public StreamElement deserialize(DataInputView source) throws IOException {
- int tag = source.readByte();
- if (tag == TAG_REC_WITH_TIMESTAMP) {
- long timestamp = source.readLong();
- return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
- }
- else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
- return new StreamRecord<T>(typeSerializer.deserialize(source));
- }
- else if (tag == TAG_WATERMARK) {
- return new Watermark(source.readLong());
- }
- else {
- throw new IOException("Corrupt stream, found tag: " + tag);
- }
- }
-
- @Override
- public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
- int tag = source.readByte();
- if (tag == TAG_REC_WITH_TIMESTAMP) {
- long timestamp = source.readLong();
- T value = typeSerializer.deserialize(source);
- StreamRecord<T> reuseRecord = reuse.asRecord();
- reuseRecord.replace(value, timestamp);
- return reuseRecord;
- }
- else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
- T value = typeSerializer.deserialize(source);
- StreamRecord<T> reuseRecord = reuse.asRecord();
- reuseRecord.replace(value);
- return reuseRecord;
- }
- else if (tag == TAG_WATERMARK) {
- return new Watermark(source.readLong());
- }
- else {
- throw new IOException("Corrupt stream, found tag: " + tag);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- // --------------------------------------------------------------------------------------------
-
- @Override
- public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() {
- return new MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer);
- }
-
- @Override
- public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig =
- ((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
- CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousTypeSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousTypeSerializerAndConfig.f1,
- typeSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new MultiplexingStreamRecordSerializer<>(
- new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
- }
- }
-
- return CompatibilityResult.requiresMigration();
- }
-
- /**
- * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}.
- */
- public static final class MultiplexingStreamRecordSerializerConfigSnapshot<T>
- extends CompositeTypeSerializerConfigSnapshot {
-
- private static final int VERSION = 1;
-
- /** This empty nullary constructor is required for deserializing the configuration. */
- public MultiplexingStreamRecordSerializerConfigSnapshot() {}
-
- public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
- super(typeSerializer);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof MultiplexingStreamRecordSerializer) {
- MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
-
- return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof MultiplexingStreamRecordSerializer;
- }
-
- @Override
- public int hashCode() {
- return typeSerializer.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index e018ba0..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
- * the element.
- *
- * <p>{@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-@Internal
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<T> typeSerializer;
-
- public StreamRecordSerializer(TypeSerializer<T> serializer) {
- if (serializer instanceof StreamRecordSerializer) {
- throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
- }
- this.typeSerializer = Preconditions.checkNotNull(serializer);
- }
-
- public TypeSerializer<T> getContainedTypeSerializer() {
- return this.typeSerializer;
- }
-
- // ------------------------------------------------------------------------
- // General serializer and type utils
- // ------------------------------------------------------------------------
-
- @Override
- public StreamRecordSerializer<T> duplicate() {
- TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
- return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public int getLength() {
- return typeSerializer.getLength();
- }
-
- // ------------------------------------------------------------------------
- // Type serialization, copying, instantiation
- // ------------------------------------------------------------------------
-
- @Override
- public StreamRecord<T> createInstance() {
- try {
- return new StreamRecord<T>(typeSerializer.createInstance());
- } catch (Exception e) {
- throw new RuntimeException("Cannot instantiate StreamRecord.", e);
- }
- }
-
- @Override
- public StreamRecord<T> copy(StreamRecord<T> from) {
- return from.copy(typeSerializer.copy(from.getValue()));
- }
-
- @Override
- public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
- from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse);
- return reuse;
- }
-
- @Override
- public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
- typeSerializer.serialize(value.getValue(), target);
- }
-
- @Override
- public StreamRecord<T> deserialize(DataInputView source) throws IOException {
- return new StreamRecord<T>(typeSerializer.deserialize(source));
- }
-
- @Override
- public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
- T element = typeSerializer.deserialize(reuse.getValue(), source);
- reuse.replace(element);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- typeSerializer.copy(source, target);
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof StreamRecordSerializer) {
- StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
-
- return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof StreamRecordSerializer;
- }
-
- @Override
- public int hashCode() {
- return typeSerializer.hashCode();
- }
-
- // --------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- // --------------------------------------------------------------------------------------------
-
- @Override
- public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
- return new StreamRecordSerializerConfigSnapshot<>(typeSerializer);
- }
-
- @Override
- public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig =
- ((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
- CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousTypeSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousTypeSerializerAndConfig.f1,
- typeSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new StreamRecordSerializer<>(
- new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
- }
- }
-
- return CompatibilityResult.requiresMigration();
- }
-
- /**
- * Configuration snapshot specific to the {@link StreamRecordSerializer}.
- */
- public static final class StreamRecordSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
-
- private static final int VERSION = 1;
-
- /** This empty nullary constructor is required for deserializing the configuration. */
- public StreamRecordSerializerConfigSnapshot() {}
-
- public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
- super(typeSerializer);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
deleted file mode 100644
index cb3c7cc..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state, which will be checkpointed.
- *
- * <h1>Deprecation and Replacement</h1>
- * The short cut replacement for this interface is via {@link ListCheckpointed} and works
- * as shown in the example below. The {@code ListCheckpointed} interface returns a list of
- * elements (
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
- *
- * private int count;
- *
- * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- * return Collections.singletonList(this.count);
- * }
- *
- * public void restoreState(List<Integer> state) throws Exception {
- * this.value = state.isEmpty() ? 0 : state.get(0);
- * }
- *
- * public T map(T value) {
- * count++;
- * return value;
- * }
- * }
- * }</pre>
- *
- * @param <T> The type of the operator state.
- *
- * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
- * {@link CheckpointedFunction} for more control over the checkpointing process.
- */
-@Deprecated
-@PublicEvolving
-public interface Checkpointed<T extends Serializable> extends CheckpointedRestoring<T> {
-
- /**
- * Gets the current state of the function of operator. The state must reflect the result of all
- * prior invocations to this function.
- *
- * @param checkpointId The ID of the checkpoint.
- * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
- * System.currentTimeMillis() on the JobManager.
- *
- * @return A snapshot of the operator state.
- *
- * @throws Exception Thrown if the creation of the state object failed. This causes the
- * checkpoint to fail. The system may decide to fail the operation (and trigger
- * recovery), or to discard this checkpoint attempt and to continue running
- * and to try again with the next checkpoint attempt.
- */
- T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
deleted file mode 100644
index 5138b49..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This interface marks a function/operator as checkpointed similar to the
- * {@link Checkpointed} interface, but gives the Flink framework the option to
- * perform the checkpoint asynchronously. Note that asynchronous checkpointing for
- * this interface has not been implemented.
- *
- * <h1>Deprecation and Replacement</h1>
- * The shortcut replacement for this interface is via {@link ListCheckpointed} and works
- * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for
- * a more detailed description of how to use the new interface.
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
- *
- * private int count;
- *
- * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- * return Collections.singletonList(this.count);
- * }
- *
- * public void restoreState(List<Integer> state) throws Exception {
- * this.value = state.isEmpty() ? 0 : state.get(0);
- * }
- *
- * public T map(T value) {
- * count++;
- * return value;
- * }
- * }
- * }</pre>
- *
- * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead,
- * as illustrated in the example above.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
deleted file mode 100644
index cfaa505..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This deprecated interface contains the methods for restoring from the legacy checkpointing mechanism of state.
- * @param <T> type of the restored state.
- *
- * @deprecated Please use {@link CheckpointedFunction} or {@link ListCheckpointed} after restoring your legacy state.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedRestoring<T extends Serializable> {
- /**
- * Restores the state of the function or operator to that of a previous checkpoint.
- * This method is invoked when a function is executed as part of a recovery run.
- *
- * <p>Note that restoreState() is called before open().
- *
- * @param state The state to be restored.
- */
- void restoreState(T state) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
deleted file mode 100644
index bb6e4bc..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-/**
- * For specifying what type of window operator was used to create the state
- * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
- * is restoring from. This is used to signal that state written using an aligned processing-time
- * window operator should be restored.
- */
-public enum LegacyWindowOperatorType {
-
- FAST_ACCUMULATING(true, false),
-
- FAST_AGGREGATING(false, true),
-
- NONE(false, false);
-
- // ------------------------------------------------------------------------
-
- private final boolean fastAccumulating;
- private final boolean fastAggregating;
-
- LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) {
- this.fastAccumulating = fastAccumulating;
- this.fastAggregating = fastAggregating;
- }
-
- public boolean isFastAccumulating() {
- return fastAccumulating;
- }
-
- public boolean isFastAggregating() {
- return fastAggregating;
- }
-
- @Override
- public String toString() {
- if (fastAccumulating) {
- return "AccumulatingProcessingTimeWindowOperator";
- } else if (fastAggregating) {
- return "AggregatingProcessingTimeWindowOperator";
- } else {
- return "WindowOperator";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 348861f..f904a10 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -50,19 +49,11 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
@@ -227,33 +218,7 @@ public class WindowedStream<T, K, W extends Window> {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowedStream." + callLocation;
-
- SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
- return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>Arriving data is incrementally aggregated using the given reducer.
- *
- * @param reduceFunction The reduce function that is used for incremental aggregation.
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- @PublicEvolving
- public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
- return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE);
+ return reduce(function, new PassThroughWindowFunction<K, W, T>());
}
/**
@@ -265,39 +230,15 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
- * @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
- @PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(
- ReduceFunction<T> reduceFunction,
- WindowFunction<T, R, K, W> function,
- TypeInformation<R> resultType) {
- return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>Arriving data is incrementally aggregated using the given reducer.
- *
- * @param reduceFunction The reduce function that is used for incremental aggregation.
- * @param function The window function.
- * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
- * the type of the previous operator whose state we inherit.
- * @return The data stream that is the result of applying the window function to the window.
- */
- private <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
- WindowFunction<T, R, K, W> function,
- LegacyWindowOperatorType legacyWindowOpType) {
+ WindowFunction<T, R, K, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
-
- return reduce(reduceFunction, function, resultType, legacyWindowOpType);
+ return reduce(reduceFunction, function, resultType);
}
/**
@@ -310,15 +251,12 @@ public class WindowedStream<T, K, W extends Window> {
* @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
* @param resultType Type information for the result type of the window function.
- * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
- * the type of the previous operator whose state we inherit.
* @return The data stream that is the result of applying the window function to the window.
*/
- private <R> SingleOutputStreamOperator<R> reduce(
+ public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
WindowFunction<T, R, K, W> function,
- TypeInformation<R> resultType,
- LegacyWindowOperatorType legacyWindowOpType) {
+ TypeInformation<R> resultType) {
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
@@ -374,8 +312,7 @@ public class WindowedStream<T, K, W extends Window> {
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness,
- lateDataOutputTag,
- legacyWindowOpType);
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -1183,12 +1120,6 @@ public class WindowedStream<T, K, W extends Window> {
String udfName = "WindowedStream." + callLocation;
- SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
- LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
String opName;
KeySelector<T, K> keySel = input.getKeySelector();
@@ -1231,8 +1162,7 @@ public class WindowedStream<T, K, W extends Window> {
function,
trigger,
allowedLateness,
- lateDataOutputTag,
- legacyWindowOpType);
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -1629,119 +1559,6 @@ public class WindowedStream<T, K, W extends Window> {
return reduce(aggregator);
}
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private LegacyWindowOperatorType getLegacyWindowType(Function function) {
- if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- if (function instanceof ReduceFunction) {
- return LegacyWindowOperatorType.FAST_AGGREGATING;
- } else if (function instanceof WindowFunction) {
- return LegacyWindowOperatorType.FAST_ACCUMULATING;
- }
- } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- if (function instanceof ReduceFunction) {
- return LegacyWindowOperatorType.FAST_AGGREGATING;
- } else if (function instanceof WindowFunction) {
- return LegacyWindowOperatorType.FAST_ACCUMULATING;
- }
- }
- return LegacyWindowOperatorType.NONE;
- }
-
- private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- ReduceFunction<?> function,
- TypeInformation<R> resultType,
- String functionName) {
-
- if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
- SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSlide();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
-
- } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
- TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSize();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer,
- input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
-
- return null;
- }
-
- private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- InternalWindowFunction<Iterable<T>, R, K, W> function,
- TypeInformation<R> resultType,
- String functionName) {
-
- if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
- SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSlide();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- @SuppressWarnings("unchecked")
- InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
- (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- timeWindowFunction, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
- TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSize();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- @SuppressWarnings("unchecked")
- InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
- (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- timeWindowFunction, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
-
- return null;
- }
-
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 3c4cfbd..fedd791 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -32,7 +32,6 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -66,7 +65,7 @@ import java.util.TreeMap;
*/
@Internal
public class ContinuousFileMonitoringFunction<OUT>
- extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction, CheckpointedRestoring<Long> {
+ extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
@@ -375,12 +374,4 @@ public class ContinuousFileMonitoringFunction<OUT>
LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
}
}
-
- @Override
- public void restoreState(Long state) throws Exception {
- this.globalModificationTime = state;
-
- LOG.info("{} (taskIdx={}) restored global modification time from an older Flink version: {}",
- getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 3a9e8e1..e14cfda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -25,30 +25,23 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -60,15 +53,15 @@ import static org.apache.flink.util.Preconditions.checkState;
* The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding
* {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction}
* which has a parallelism of 1, this operator can have DOP > 1.
- * <p/>
- * As soon as a split descriptor is received, it is put in a queue, and have another
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and have another
* thread read the actual data of the split. This architecture allows the separation of the
* reading thread from the one emitting the checkpoint barriers, thus removing any potential
* back-pressure.
*/
@Internal
public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, CheckpointedRestoringOperator {
+ implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
@@ -422,83 +415,4 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
}
}
-
- // ------------------------------------------------------------------------
- // Restoring / Migrating from an older Flink version.
- // ------------------------------------------------------------------------
-
- @Override
- public void restoreState(FSDataInputStream in) throws Exception {
-
- LOG.info("{} (taskIdx={}) restoring state from an older Flink version.",
- getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
-
- // this is just to read the byte indicating if we have udf state or not
- int hasUdfState = in.read();
-
- Preconditions.checkArgument(hasUdfState == 0);
-
- final ObjectInputStream ois = new ObjectInputStream(in);
- final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
-
- // read the split that was being read
- FileInputSplit currSplit = (FileInputSplit) ois.readObject();
-
- // read the pending splits list
- List<FileInputSplit> pendingSplits = new LinkedList<>();
- int noOfSplits = div.readInt();
- for (int i = 0; i < noOfSplits; i++) {
- FileInputSplit split = (FileInputSplit) ois.readObject();
- pendingSplits.add(split);
- }
-
- // read the state of the format
- Serializable formatState = (Serializable) ois.readObject();
-
- div.close();
-
- if (restoredReaderState == null) {
- restoredReaderState = new ArrayList<>();
- }
-
- // we do not know the modification time of the retrieved splits, so we assign them
- // artificial ones, with the only constraint that they respect the relative order of the
- // retrieved splits, because modification time is going to be used to sort the splits within
- // the "pending splits" priority queue.
-
- long now = getProcessingTimeService().getCurrentProcessingTime();
- long runningModTime = Math.max(now, noOfSplits + 1);
-
- TimestampedFileInputSplit currentSplit = createTimestampedFileSplit(currSplit, --runningModTime, formatState);
- restoredReaderState.add(currentSplit);
- for (FileInputSplit split : pendingSplits) {
- TimestampedFileInputSplit timestampedSplit = createTimestampedFileSplit(split, --runningModTime);
- restoredReaderState.add(timestampedSplit);
- }
-
- if (LOG.isDebugEnabled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.",
- getClass().getSimpleName(),
- getRuntimeContext().getIndexOfThisSubtask(),
- restoredReaderState.size(),
- restoredReaderState);
- }
- }
- }
-
- private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime) {
- return createTimestampedFileSplit(split, modificationTime, null);
- }
-
- private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime, Serializable state) {
- TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit(
- modificationTime, split.getSplitNumber(), split.getPath(),
- split.getStart(), split.getLength(), split.getHostnames());
-
- if (state != null) {
- timestampedSplit.setSplitState(state);
- }
- return timestampedSplit;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index abaa74e..884b899 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -116,7 +115,7 @@ public class StreamingJobGraphGenerator {
private StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
- this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+ this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
this.jobVertices = new HashMap<>();
this.builtVertices = new HashSet<>();
@@ -241,14 +240,14 @@ public class StreamingJobGraphGenerator {
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
- List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId);
- if (operatorHashes == null) {
- operatorHashes = new ArrayList<>();
- chainedOperatorHashes.put(startNodeId, operatorHashes);
- }
+ List<Tuple2<byte[], byte[]>> operatorHashes =
+ chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
- operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHashes.get(1).get(currentNodeId)));
+
+ for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+ operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
+ }
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a72b9fe..a28fc30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
@@ -55,7 +54,6 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -251,42 +249,6 @@ public abstract class AbstractStreamOperator<OUT>
getContainingTask().getCancelables()); // access to register streams for canceling
initializeState(initializationContext);
-
- if (restoring) {
-
- // finally restore the legacy state in case we are
- // migrating from a previous Flink version.
-
- restoreStreamCheckpointed(stateHandles);
- }
- }
-
- /**
- * @deprecated Non-repartitionable operator state that has been deprecated.
- * Can be removed when we remove the APIs for non-repartitionable operator state.
- */
- @Deprecated
- private void restoreStreamCheckpointed(OperatorSubtaskState stateHandles) throws Exception {
- StreamStateHandle state = stateHandles.getLegacyOperatorState();
- if (null != state) {
- if (this instanceof CheckpointedRestoringOperator) {
-
- LOG.debug("Restore state of task {} in operator with id ({}).",
- getContainingTask().getName(), getOperatorID());
-
- FSDataInputStream is = state.openInputStream();
- try {
- getContainingTask().getCancelables().registerClosable(is);
- ((CheckpointedRestoringOperator) this).restoreState(is);
- } finally {
- getContainingTask().getCancelables().unregisterClosable(is);
- is.close();
- }
- } else {
- throw new Exception(
- "Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
- }
- }
}
/**
@@ -451,35 +413,6 @@ public abstract class AbstractStreamOperator<OUT>
}
/**
- * @deprecated Non-repartitionable operator state that has been deprecated.
- * Can be removed when we remove the APIs for non-repartitionable operator state.
- */
- @SuppressWarnings("deprecation")
- @Deprecated
- @Override
- public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
- if (this instanceof StreamCheckpointedOperator) {
- CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
-
- final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
- factory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
- getContainingTask().getCancelables().registerClosable(outStream);
-
- try {
- ((StreamCheckpointedOperator) this).snapshotState(outStream, checkpointId, timestamp);
- return outStream.closeAndGetHandle();
- }
- finally {
- getContainingTask().getCancelables().unregisterClosable(outStream);
- outStream.close();
- }
- } else {
- return null;
- }
- }
-
- /**
* Stream operators with state which can be restored need to override this hook method.
*
* @param context context that allows to register different states.