You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:34 UTC

[6/7] flink git commit: [FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.

[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5

Branch: refs/heads/master
Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279
Parents: df16e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 15:04:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointOptions.java   |  2 +-
 .../io/network/api/CheckpointBarrier.java       | 66 +++++++-------------
 .../io/network/api/CheckpointBarrierTest.java   | 40 ++++++------
 .../api/serialization/EventSerializerTest.java  | 13 ++--
 4 files changed, 46 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index cb98d10..676cf3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable {
 
 	private CheckpointOptions(
 			@Nonnull CheckpointType checkpointType,
-			String targetLocation) {
+			@Nullable  String targetLocation) {
 		this.checkpointType = checkNotNull(checkpointType);
 		this.targetLocation = targetLocation;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index a42c25d..97ad90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
-import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils;
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
-	private long id;
-	private long timestamp;
-	private CheckpointOptions checkpointOptions;
-
-	public CheckpointBarrier() {}
+	private final long id;
+	private final long timestamp;
+	private final CheckpointOptions checkpointOptions;
 
 	public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 		this.id = id;
@@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent {
 	// ------------------------------------------------------------------------
 	// Serialization
 	// ------------------------------------------------------------------------
-	
+
+	//
+	//  These methods are inherited form the generic serialization of AbstractEvent
+	//  but would require the CheckpointBarrier to be mutable. Since all serialization
+	//  for events goes through the EventSerializer class, which has special serialization
+	//  for the CheckpointBarrier, we don't need these methods
+	// 
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-		out.writeLong(timestamp);
-		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-
-		out.writeInt(checkpointType.ordinal());
-
-		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-			return;
-		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = checkpointOptions.getTargetLocation();
-			checkState(targetLocation != null);
-			StringUtils.writeString(targetLocation, out);
-		} else {
-			throw new IOException("Unknown CheckpointType " + checkpointType);
-		}
+		throw new UnsupportedOperationException("This method should never be called");
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-		timestamp = in.readLong();
-
-		int typeOrdinal = in.readInt();
-		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
-		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
-
-		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-			checkpointOptions = CheckpointOptions.forFullCheckpoint();
-		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = StringUtils.readString(in);
-			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
-		} else {
-			throw new IOException("Illegal CheckpointType " + checkpointType);
-		}
+		throw new UnsupportedOperationException("This method should never be called");
 	}
 
-
 	// ------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
-		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+		return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 32));
 	}
 
 	@Override
 	public boolean equals(Object other) {
-		if (other == null || !(other instanceof CheckpointBarrier)) {
+		if (other == this) {
+			return true;
+		}
+		else if (other == null || other.getClass() != CheckpointBarrier.class) {
 			return false;
 		}
 		else {
 			CheckpointBarrier that = (CheckpointBarrier) other;
-			return that.id == this.id && that.timestamp == this.timestamp;
+			return that.id == this.id && that.timestamp == this.timestamp &&
+					this.checkpointOptions.equals(that.checkpointOptions);
 		}
 	}
 
 	@Override
 	public String toString() {
-		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+		return String.format("CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointOptions);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ad9fc16..ba833c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,44 +18,40 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
+
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
+
 public class CheckpointBarrierTest {
 
 	/**
 	 * Test serialization of the checkpoint barrier.
+	 * The checkpoint barrier does not support its own serialization, in order to be immutable.
 	 */
 	@Test
 	public void testSerialization() throws Exception {
 		long id = Integer.MAX_VALUE + 123123L;
 		long timestamp = Integer.MAX_VALUE + 1228L;
 
-		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
-		testSerialization(id, timestamp, checkpoint);
-
-		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
-		testSerialization(id, timestamp, savepoint);
-	}
-
-	private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
 		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
 
-		DataOutputSerializer out = new DataOutputSerializer(1024);
-		barrier.write(out);
-
-		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
-		CheckpointBarrier deserialized = new CheckpointBarrier();
-		deserialized.read(in);
-
-		assertEquals(id, deserialized.getId());
-		assertEquals(timestamp, deserialized.getTimestamp());
-		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
-		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+		try {
+			barrier.write(new DataOutputSerializer(1024));
+			fail("should throw an exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
+
+		try {
+			barrier.read(new DataInputDeserializer(new byte[32]));
+			fail("should throw an exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index e674eb7..f51b083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -147,15 +147,14 @@ public class EventSerializerTest {
 	 * 		thinks the encoded buffer matches the class
 	 * @throws IOException
 	 */
-	private final boolean checkIsEvent(final AbstractEvent event,
-		final Class<? extends AbstractEvent> eventClass) throws
-		IOException {
-		final Buffer serializedEvent =
-			EventSerializer.toBuffer(event);
+	private boolean checkIsEvent(
+			AbstractEvent event, 
+			Class<? extends AbstractEvent> eventClass) throws IOException {
+
+		final Buffer serializedEvent = EventSerializer.toBuffer(event);
 		try {
 			final ClassLoader cl = getClass().getClassLoader();
-			return EventSerializer
-				.isEvent(serializedEvent, eventClass, cl);
+			return EventSerializer.isEvent(serializedEvent, eventClass, cl);
 		} finally {
 			serializedEvent.recycle();
 		}