You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/27 16:26:19 UTC

[2/8] flink git commit: [hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase

[hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed01e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed01e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed01e67

Branch: refs/heads/master
Commit: 9ed01e67b3f440707af149b8c2ca5ca4a66db3c1
Parents: f0964e1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 24 21:20:44 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 27 17:37:22 2017 +0200

----------------------------------------------------------------------
 .../runtime/state/SerializedCheckpointData.java | 175 ------------------
 .../source/MessageAcknowledgingSourceBase.java  |   1 -
 .../source/SerializedCheckpointData.java        | 180 +++++++++++++++++++
 3 files changed, 180 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
deleted file mode 100644
index 394791b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class represents serialized checkpoint data for a collection of elements.
- */
-public class SerializedCheckpointData implements java.io.Serializable {
-
-	private static final long serialVersionUID = -8783744683896503488L;
-	
-	/** ID of the checkpoint for which the IDs are stored */
-	private final long checkpointId;
-
-	/** The serialized elements */
-	private final byte[] serializedData;
-
-	/** The number of elements in the checkpoint */
-	private final int numIds;
-
-	/**
-	 * Creates a SerializedCheckpointData object for the given serialized data.
-	 * 
-	 * @param checkpointId The checkpointId of the checkpoint.
-	 * @param serializedData The serialized IDs in this checkpoint.
-	 * @param numIds The number of IDs in the checkpoint.
-	 */
-	public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
-		this.checkpointId = checkpointId;
-		this.serializedData = serializedData;
-		this.numIds = numIds;
-	}
-
-	/**
-	 * Gets the checkpointId of the checkpoint.
-	 * @return The checkpointId of the checkpoint.
-	 */
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	/**
-	 * Gets the binary data for the serialized elements.
-	 * @return The binary data for the serialized elements.
-	 */
-	public byte[] getSerializedData() {
-		return serializedData;
-	}
-
-	/**
-	 * Gets the number of IDs in the checkpoint.
-	 * @return The number of IDs in the checkpoint.
-	 */
-	public int getNumIds() {
-		return numIds;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialize to Checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
-	 * 
-	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
-	 * @param serializer The serializer to serialize the IDs.
-	 * @param <T> The type of the ID.
-	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
-	 * 
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
-												TypeSerializer<T> serializer) throws IOException {
-		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
-	}
-
-	/**
-	 * Converts a list of checkpoints into an array of SerializedCheckpointData.
-	 *
-	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
-	 * @param serializer The serializer to serialize the IDs.
-	 * @param outputBuffer The reusable serialization buffer.
-	 * @param <T> The type of the ID.
-	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
-	 *
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
-												TypeSerializer<T> serializer,
-												DataOutputSerializer outputBuffer) throws IOException {
-		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
-		
-		int pos = 0;
-		for (Tuple2<Long, Set<T>> checkpoint : checkpoints) {
-			outputBuffer.clear();
-			Set<T> checkpointIds = checkpoint.f1;
-			
-			for (T id : checkpointIds) {
-				serializer.serialize(id, outputBuffer);
-			}
-
-			serializedCheckpoints[pos++] = new SerializedCheckpointData(
-					checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
-		}
-		
-		return serializedCheckpoints;
-	}
-
-	// ------------------------------------------------------------------------
-	//  De-Serialize from Checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
-	 * 
-	 * @param data The data to be deserialized.
-	 * @param serializer The serializer used to deserialize the data.
-	 * @param <T> The type of the elements.
-	 * @return An ArrayDeque of element checkpoints.
-	 * 
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
-			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException {
-		ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(data.length);
-		DataInputDeserializer deser = null;
-		
-		for (SerializedCheckpointData checkpoint : data) {
-			byte[] serializedData = checkpoint.getSerializedData();
-			if (deser == null) {
-				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
-			}
-			else {
-				deser.setBuffer(serializedData, 0, serializedData.length);
-			}
-			
-			final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
-			final int numIds = checkpoint.getNumIds();
-			
-			for (int i = 0; i < numIds; i++) {
-				ids.add(serializer.deserialize(deser));
-			}
-
-			deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
-		}
-		
-		return deque;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 604755d..a0b3129 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
new file mode 100644
index 0000000..26cc5ad
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents serialized checkpoint data for a collection of elements.
+ */
+class SerializedCheckpointData implements java.io.Serializable {
+
+	private static final long serialVersionUID = -8783744683896503488L;
+
+	/** ID of the checkpoint for which the IDs are stored. */
+	private final long checkpointId;
+
+	/** The serialized elements. */
+	private final byte[] serializedData;
+
+	/** The number of elements in the checkpoint. */
+	private final int numIds;
+
+	/**
+	 * Creates a SerializedCheckpointData object for the given serialized data.
+	 *
+	 * @param checkpointId The checkpointId of the checkpoint.
+	 * @param serializedData The serialized IDs in this checkpoint.
+	 * @param numIds The number of IDs in the checkpoint.
+	 */
+	public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
+		this.checkpointId = checkpointId;
+		this.serializedData = serializedData;
+		this.numIds = numIds;
+	}
+
+	/**
+	 * Gets the checkpointId of the checkpoint.
+	 * @return The checkpointId of the checkpoint.
+	 */
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	/**
+	 * Gets the binary data for the serialized elements.
+	 * @return The binary data for the serialized elements.
+	 */
+	public byte[] getSerializedData() {
+		return serializedData;
+	}
+
+	/**
+	 * Gets the number of IDs in the checkpoint.
+	 * @return The number of IDs in the checkpoint.
+	 */
+	public int getNumIds() {
+		return numIds;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialize to Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
+	 *
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the queue.
+	 *
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(
+			ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
+			TypeSerializer<T> serializer) throws IOException {
+		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
+	}
+
+	/**
+	 * Converts a list of checkpoints into an array of SerializedCheckpointData.
+	 *
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param outputBuffer The reusable serialization buffer.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the queue.
+	 *
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(
+			ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
+			TypeSerializer<T> serializer,
+			DataOutputSerializer outputBuffer) throws IOException {
+
+		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
+
+		int pos = 0;
+		for (Tuple2<Long, Set<T>> checkpoint : checkpoints) {
+			outputBuffer.clear();
+			Set<T> checkpointIds = checkpoint.f1;
+
+			for (T id : checkpointIds) {
+				serializer.serialize(id, outputBuffer);
+			}
+
+			serializedCheckpoints[pos++] = new SerializedCheckpointData(
+					checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
+		}
+
+		return serializedCheckpoints;
+	}
+
+	// ------------------------------------------------------------------------
+	//  De-Serialize from Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
+	 *
+	 * @param data The data to be deserialized.
+	 * @param serializer The serializer used to deserialize the data.
+	 * @param <T> The type of the elements.
+	 * @return An ArrayDeque of element checkpoints.
+	 *
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
+			SerializedCheckpointData[] data,
+			TypeSerializer<T> serializer) throws IOException {
+
+		ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(data.length);
+		DataInputDeserializer deser = null;
+
+		for (SerializedCheckpointData checkpoint : data) {
+			byte[] serializedData = checkpoint.getSerializedData();
+			if (deser == null) {
+				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
+			}
+			else {
+				deser.setBuffer(serializedData, 0, serializedData.length);
+			}
+
+			final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
+			final int numIds = checkpoint.getNumIds();
+
+			for (int i = 0; i < numIds; i++) {
+				ids.add(serializer.deserialize(deser));
+			}
+
+			deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
+		}
+
+		return deque;
+	}
+}