You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:42 UTC

[3/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff14249..8ada6d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -46,9 +47,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -558,9 +559,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 		try {
-			return performCheckpoint(checkpointId, timestamp, 0L, 0L);
+			checkpointMetaData.
+					setBytesBufferedInAlignment(0L).
+					setAlignmentDurationNanos(0L);
+			return performCheckpoint(checkpointMetaData);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -573,11 +577,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpointOnBarrier(
-			long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 
 		try {
-			performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+			performCheckpoint(checkpointMetaData);
 		}
 		catch (CancelTaskException e) {
 			throw e;
@@ -587,8 +590,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(
-			long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+
+		long checkpointId = checkpointMetaData.getCheckpointId();
+		long timestamp = checkpointMetaData.getTimestamp();
 
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 
@@ -674,8 +679,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 
-				final long endOfSyncPart = System.nanoTime();
-				final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+				final long syncEndNanos = System.nanoTime();
+				final long syncDurationMillis = (syncEndNanos - startOfSyncPart) / 1_000_000;
+
+				checkpointMetaData.setSyncDurationMillis(syncDurationMillis);
 
 				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 						"checkpoint-" + checkpointId + "-" + timestamp,
@@ -684,11 +691,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						chainedNonPartitionedStateHandles,
 						chainedPartitionedStateHandles,
 						keyGroupsStateHandleFuture,
-						checkpointId,
-						bytesBufferedAlignment,
-						alignmentDurationNanos,
-						syncDurationMillis,
-						endOfSyncPart);
+						checkpointMetaData,
+						syncEndNanos);
 
 				cancelables.registerClosable(asyncCheckpointRunnable);
 				asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -696,7 +700,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished synchronous part of checkpoint {}." +
 							"Alignment duration: {} ms, snapshot duration {} ms",
-							getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+							getName(), checkpointId, checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, syncDurationMillis);
 				}
 
 				return true;
@@ -914,15 +918,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		private final RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture;
 
-		private final long checkpointId;
-
 		private final String name;
 
-		private final long bytesBufferedInAlignment;
-
-		private final long alignmentDurationNanos;
-
-		private final long syncDurationMillies;
+		private final CheckpointMetaData checkpointMetaData;
 
 		private final long asyncStartNanos;
 
@@ -933,11 +931,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
 				ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
 				RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
-				long checkpointId,
-				long bytesBufferedInAlignment,
-				long alignmentDurationNanos,
-				long syncDurationMillies,
-				long asyncStartNanos) {
+				CheckpointMetaData checkpointMetaData,
+				long asyncStartNanos
+		) {
 
 			this.name = name;
 			this.owner = owner;
@@ -945,10 +941,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			this.nonPartitionedStateHandles = nonPartitionedStateHandles;
 			this.partitioneableStateHandles = partitioneableStateHandles;
 			this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
-			this.checkpointId = checkpointId;
-			this.bytesBufferedInAlignment = bytesBufferedInAlignment;
-			this.alignmentDurationNanos = alignmentDurationNanos;
-			this.syncDurationMillies = syncDurationMillies;
+			this.checkpointMetaData = checkpointMetaData;
 			this.asyncStartNanos = asyncStartNanos;
 		}
 
@@ -974,26 +967,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final long asyncEndNanos = System.nanoTime();
 				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
 
+				checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+
 				if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
-					owner.getEnvironment().acknowledgeCheckpoint(
-							checkpointId,
-							syncDurationMillies, asyncDurationMillis,
-							bytesBufferedInAlignment, alignmentDurationNanos);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
 				} else {
 					CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
 							nonPartitionedStateHandles,
 							partitioneableStateHandles,
 							keyedStates);
 
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
-							allStateHandles,
-							syncDurationMillies, asyncDurationMillis,
-							bytesBufferedInAlignment, alignmentDurationNanos);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, allStateHandles);
 				}
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", 
-							owner.getName(), checkpointId, asyncDurationMillis);
+							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
 				}
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 5d68841..2cabd70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -981,19 +982,17 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp,
-				long bytesAligned, long alignmentTimeNanos) throws Exception {
-
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
-			assertTrue(timestamp > 0);
-			assertTrue(bytesAligned >= 0);
-			assertTrue(alignmentTimeNanos >= 0);
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+
+			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
+			assertTrue(checkpointMetaData.getTimestamp() > 0);
+			assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
+			assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
 
 			nextExpectedCheckpointId++;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index f2f9092..b6d0450 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -374,18 +375,16 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(
-				long checkpointId, long timestamp,
-				long bytesAligned, long alignmentTimeNanos) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
 
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
-			assertTrue(timestamp > 0);
+			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointMetaData.getCheckpointId());
+			assertTrue(checkpointMetaData.getTimestamp() > 0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f6e7dca..1b2b723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -374,7 +375,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
 
-		while(!streamTask.triggerCheckpoint(checkpointId, checkpointTimestamp));
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
+
+		while(!streamTask.triggerCheckpoint(checkpointMetaData));
 
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);
@@ -517,11 +520,10 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		@Override
 		public void acknowledgeCheckpoint(
-				long checkpointId,
-				CheckpointStateHandles checkpointStateHandles,
-				long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+				CheckpointMetaData checkpointMetaData,
+				CheckpointStateHandles checkpointStateHandles) {
 
-			this.checkpointId = checkpointId;
+			this.checkpointId = checkpointMetaData.getCheckpointId();
 			if(checkpointStateHandles != null) {
 				this.state = checkpointStateHandles.getNonPartitionedStateHandles();
 				this.keyGroupStates = checkpointStateHandles.getKeyGroupsStateHandle();

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1e62e28..10fc400 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -29,11 +30,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -235,7 +234,8 @@ public class SourceStreamTaskTest {
 		public Boolean call() throws Exception {
 			for (int i = 0; i < numCheckpoints; i++) {
 				long currentCheckpointId = checkpointId.getAndIncrement();
-				sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
+				CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
+				sourceTask.triggerCheckpoint(checkpointMetaData);
 				Thread.sleep(checkpointInterval);
 			}
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..f852682 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,18 +43,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -311,18 +308,12 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(
-			long checkpointId,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+	public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
-			long checkpointId,
-			CheckpointStateHandles checkpointStateHandles,
-			long synchronousDurationMillis, long asynchronousDurationMillis,
-			long bytesBufferedInAlignment, long alignmentDurationNanos) {
+			CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
 	}
 
 	@Override