You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:34 UTC
[6/7] flink git commit: [FLINK-5887] [checkpointing] Make
CheckpointBarrier type immutable.
[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5
Branch: refs/heads/master
Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279
Parents: df16e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 15:04:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointOptions.java | 2 +-
.../io/network/api/CheckpointBarrier.java | 66 +++++++-------------
.../io/network/api/CheckpointBarrierTest.java | 40 ++++++------
.../api/serialization/EventSerializerTest.java | 13 ++--
4 files changed, 46 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index cb98d10..676cf3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable {
private CheckpointOptions(
@Nonnull CheckpointType checkpointType,
- String targetLocation) {
+ @Nullable String targetLocation) {
this.checkpointType = checkNotNull(checkpointType);
this.targetLocation = targetLocation;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index a42c25d..97ad90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,18 +18,14 @@
package org.apache.flink.runtime.io.network.api;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
-import org.apache.flink.util.StringUtils;
/**
* Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils;
*/
public class CheckpointBarrier extends RuntimeEvent {
- private long id;
- private long timestamp;
- private CheckpointOptions checkpointOptions;
-
- public CheckpointBarrier() {}
+ private final long id;
+ private final long timestamp;
+ private final CheckpointOptions checkpointOptions;
public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
@@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent {
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
-
+
+ //
+ // These methods are inherited form the generic serialization of AbstractEvent
+ // but would require the CheckpointBarrier to be mutable. Since all serialization
+ // for events goes through the EventSerializer class, which has special serialization
+ // for the CheckpointBarrier, we don't need these methods
+ //
+
@Override
public void write(DataOutputView out) throws IOException {
- out.writeLong(id);
- out.writeLong(timestamp);
- CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-
- out.writeInt(checkpointType.ordinal());
-
- if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
- return;
- } else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = checkpointOptions.getTargetLocation();
- checkState(targetLocation != null);
- StringUtils.writeString(targetLocation, out);
- } else {
- throw new IOException("Unknown CheckpointType " + checkpointType);
- }
+ throw new UnsupportedOperationException("This method should never be called");
}
@Override
public void read(DataInputView in) throws IOException {
- id = in.readLong();
- timestamp = in.readLong();
-
- int typeOrdinal = in.readInt();
- checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
- CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
-
- if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
- checkpointOptions = CheckpointOptions.forFullCheckpoint();
- } else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = StringUtils.readString(in);
- checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
- } else {
- throw new IOException("Illegal CheckpointType " + checkpointType);
- }
+ throw new UnsupportedOperationException("This method should never be called");
}
-
// ------------------------------------------------------------------------
@Override
public int hashCode() {
- return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+ return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 32));
}
@Override
public boolean equals(Object other) {
- if (other == null || !(other instanceof CheckpointBarrier)) {
+ if (other == this) {
+ return true;
+ }
+ else if (other == null || other.getClass() != CheckpointBarrier.class) {
return false;
}
else {
CheckpointBarrier that = (CheckpointBarrier) other;
- return that.id == this.id && that.timestamp == this.timestamp;
+ return that.id == this.id && that.timestamp == this.timestamp &&
+ this.checkpointOptions.equals(that.checkpointOptions);
}
}
@Override
public String toString() {
- return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+ return String.format("CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointOptions);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ad9fc16..ba833c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,44 +18,40 @@
package org.apache.flink.runtime.io.network.api;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
+
import org.junit.Test;
+import static org.junit.Assert.fail;
+
public class CheckpointBarrierTest {
/**
* Test serialization of the checkpoint barrier.
+ * The checkpoint barrier does not support its own serialization, in order to be immutable.
*/
@Test
public void testSerialization() throws Exception {
long id = Integer.MAX_VALUE + 123123L;
long timestamp = Integer.MAX_VALUE + 1228L;
- CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
- testSerialization(id, timestamp, checkpoint);
-
- CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
- testSerialization(id, timestamp, savepoint);
- }
-
- private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
- DataOutputSerializer out = new DataOutputSerializer(1024);
- barrier.write(out);
-
- DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
- CheckpointBarrier deserialized = new CheckpointBarrier();
- deserialized.read(in);
-
- assertEquals(id, deserialized.getId());
- assertEquals(timestamp, deserialized.getTimestamp());
- assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
- assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ try {
+ barrier.write(new DataOutputSerializer(1024));
+ fail("should throw an exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ barrier.read(new DataInputDeserializer(new byte[32]));
+ fail("should throw an exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index e674eb7..f51b083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -147,15 +147,14 @@ public class EventSerializerTest {
* thinks the encoded buffer matches the class
* @throws IOException
*/
- private final boolean checkIsEvent(final AbstractEvent event,
- final Class<? extends AbstractEvent> eventClass) throws
- IOException {
- final Buffer serializedEvent =
- EventSerializer.toBuffer(event);
+ private boolean checkIsEvent(
+ AbstractEvent event,
+ Class<? extends AbstractEvent> eventClass) throws IOException {
+
+ final Buffer serializedEvent = EventSerializer.toBuffer(event);
try {
final ClassLoader cl = getClass().getClassLoader();
- return EventSerializer
- .isEvent(serializedEvent, eventClass, cl);
+ return EventSerializer.isEvent(serializedEvent, eventClass, cl);
} finally {
serializedEvent.recycle();
}