You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/27 16:26:19 UTC
[2/8] flink git commit: [hotfix] [streaming] Move
SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase
[hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed01e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed01e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed01e67
Branch: refs/heads/master
Commit: 9ed01e67b3f440707af149b8c2ca5ca4a66db3c1
Parents: f0964e1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 24 21:20:44 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 27 17:37:22 2017 +0200
----------------------------------------------------------------------
.../runtime/state/SerializedCheckpointData.java | 175 ------------------
.../source/MessageAcknowledgingSourceBase.java | 1 -
.../source/SerializedCheckpointData.java | 180 +++++++++++++++++++
3 files changed, 180 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
deleted file mode 100644
index 394791b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class represents serialized checkpoint data for a collection of elements.
- */
-public class SerializedCheckpointData implements java.io.Serializable {
-
- private static final long serialVersionUID = -8783744683896503488L;
-
- /** ID of the checkpoint for which the IDs are stored */
- private final long checkpointId;
-
- /** The serialized elements */
- private final byte[] serializedData;
-
- /** The number of elements in the checkpoint */
- private final int numIds;
-
- /**
- * Creates a SerializedCheckpointData object for the given serialized data.
- *
- * @param checkpointId The checkpointId of the checkpoint.
- * @param serializedData The serialized IDs in this checkpoint.
- * @param numIds The number of IDs in the checkpoint.
- */
- public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
- this.checkpointId = checkpointId;
- this.serializedData = serializedData;
- this.numIds = numIds;
- }
-
- /**
- * Gets the checkpointId of the checkpoint.
- * @return The checkpointId of the checkpoint.
- */
- public long getCheckpointId() {
- return checkpointId;
- }
-
- /**
- * Gets the binary data for the serialized elements.
- * @return The binary data for the serialized elements.
- */
- public byte[] getSerializedData() {
- return serializedData;
- }
-
- /**
- * Gets the number of IDs in the checkpoint.
- * @return The number of IDs in the checkpoint.
- */
- public int getNumIds() {
- return numIds;
- }
-
- // ------------------------------------------------------------------------
- // Serialize to Checkpoint
- // ------------------------------------------------------------------------
-
- /**
- * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
- *
- * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
- * @param serializer The serializer to serialize the IDs.
- * @param <T> The type of the ID.
- * @return An array of serializable SerializedCheckpointData, one per entry in the
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
- TypeSerializer<T> serializer) throws IOException {
- return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
- }
-
- /**
- * Converts a list of checkpoints into an array of SerializedCheckpointData.
- *
- * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
- * @param serializer The serializer to serialize the IDs.
- * @param outputBuffer The reusable serialization buffer.
- * @param <T> The type of the ID.
- * @return An array of serializable SerializedCheckpointData, one per entry in the
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
- TypeSerializer<T> serializer,
- DataOutputSerializer outputBuffer) throws IOException {
- SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
-
- int pos = 0;
- for (Tuple2<Long, Set<T>> checkpoint : checkpoints) {
- outputBuffer.clear();
- Set<T> checkpointIds = checkpoint.f1;
-
- for (T id : checkpointIds) {
- serializer.serialize(id, outputBuffer);
- }
-
- serializedCheckpoints[pos++] = new SerializedCheckpointData(
- checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
- }
-
- return serializedCheckpoints;
- }
-
- // ------------------------------------------------------------------------
- // De-Serialize from Checkpoint
- // ------------------------------------------------------------------------
-
- /**
- * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
- *
- * @param data The data to be deserialized.
- * @param serializer The serializer used to deserialize the data.
- * @param <T> The type of the elements.
- * @return An ArrayDeque of element checkpoints.
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
- SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException {
- ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(data.length);
- DataInputDeserializer deser = null;
-
- for (SerializedCheckpointData checkpoint : data) {
- byte[] serializedData = checkpoint.getSerializedData();
- if (deser == null) {
- deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
- }
- else {
- deser.setBuffer(serializedData, 0, serializedData.length);
- }
-
- final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
- final int numIds = checkpoint.getNumIds();
-
- for (int i = 0; i < numIds; i++) {
- ids.add(serializer.deserialize(deser));
- }
-
- deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
- }
-
- return deque;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 604755d..a0b3129 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Preconditions;
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed01e67/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
new file mode 100644
index 0000000..26cc5ad
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents serialized checkpoint data for a collection of elements.
+ */
+class SerializedCheckpointData implements java.io.Serializable {
+
+ private static final long serialVersionUID = -8783744683896503488L;
+
+ /** ID of the checkpoint for which the IDs are stored. */
+ private final long checkpointId;
+
+ /** The serialized elements. */
+ private final byte[] serializedData;
+
+ /** The number of elements in the checkpoint. */
+ private final int numIds;
+
+ /**
+ * Creates a SerializedCheckpointData object for the given serialized data.
+ *
+ * @param checkpointId The checkpointId of the checkpoint.
+ * @param serializedData The serialized IDs in this checkpoint.
+ * @param numIds The number of IDs in the checkpoint.
+ */
+ public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
+ this.checkpointId = checkpointId;
+ this.serializedData = serializedData;
+ this.numIds = numIds;
+ }
+
+ /**
+ * Gets the checkpointId of the checkpoint.
+ * @return The checkpointId of the checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Gets the binary data for the serialized elements.
+ * @return The binary data for the serialized elements.
+ */
+ public byte[] getSerializedData() {
+ return serializedData;
+ }
+
+ /**
+ * Gets the number of IDs in the checkpoint.
+ * @return The number of IDs in the checkpoint.
+ */
+ public int getNumIds() {
+ return numIds;
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialize to Checkpoint
+ // ------------------------------------------------------------------------
+
+ /**
+ * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
+ *
+ * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+ * @param serializer The serializer to serialize the IDs.
+ * @param <T> The type of the ID.
+ * @return An array of serializable SerializedCheckpointData, one per entry in the queue.
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> SerializedCheckpointData[] fromDeque(
+ ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
+ TypeSerializer<T> serializer) throws IOException {
+ return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
+ }
+
+ /**
+ * Converts a list of checkpoints into an array of SerializedCheckpointData.
+ *
+ * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+ * @param serializer The serializer to serialize the IDs.
+ * @param outputBuffer The reusable serialization buffer.
+ * @param <T> The type of the ID.
+ * @return An array of serializable SerializedCheckpointData, one per entry in the queue.
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> SerializedCheckpointData[] fromDeque(
+ ArrayDeque<Tuple2<Long, Set<T>>> checkpoints,
+ TypeSerializer<T> serializer,
+ DataOutputSerializer outputBuffer) throws IOException {
+
+ SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
+
+ int pos = 0;
+ for (Tuple2<Long, Set<T>> checkpoint : checkpoints) {
+ outputBuffer.clear();
+ Set<T> checkpointIds = checkpoint.f1;
+
+ for (T id : checkpointIds) {
+ serializer.serialize(id, outputBuffer);
+ }
+
+ serializedCheckpoints[pos++] = new SerializedCheckpointData(
+ checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
+ }
+
+ return serializedCheckpoints;
+ }
+
+ // ------------------------------------------------------------------------
+ // De-Serialize from Checkpoint
+ // ------------------------------------------------------------------------
+
+ /**
+ * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
+ *
+ * @param data The data to be deserialized.
+ * @param serializer The serializer used to deserialize the data.
+ * @param <T> The type of the elements.
+ * @return An ArrayDeque of element checkpoints.
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
+ SerializedCheckpointData[] data,
+ TypeSerializer<T> serializer) throws IOException {
+
+ ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(data.length);
+ DataInputDeserializer deser = null;
+
+ for (SerializedCheckpointData checkpoint : data) {
+ byte[] serializedData = checkpoint.getSerializedData();
+ if (deser == null) {
+ deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
+ }
+ else {
+ deser.setBuffer(serializedData, 0, serializedData.length);
+ }
+
+ final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
+ final int numIds = checkpoint.getNumIds();
+
+ for (int i = 0; i < numIds; i++) {
+ ids.add(serializer.deserialize(deser));
+ }
+
+ deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
+ }
+
+ return deque;
+ }
+}