You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/11 17:53:38 UTC

flink git commit: [FLINK-7149] [connectors] Add checkpoint ID to 'sendValues()' in GenericWriteAheadSink

Repository: flink
Updated Branches:
  refs/heads/release-1.3 ad8766e10 -> faf797456


[FLINK-7149] [connectors] Add checkpoint ID to 'sendValues()' in GenericWriteAheadSink


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/faf79745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/faf79745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/faf79745

Branch: refs/heads/release-1.3
Commit: faf79745659130b6da61034e275abbb3bcb3371c
Parents: ad8766e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 11 12:46:31 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 11 19:52:53 2017 +0200

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraTupleWriteAheadSink.java  | 2 +-
 .../cassandra/CassandraTupleWriteAheadSinkTest.java         | 4 ++--
 .../streaming/runtime/operators/GenericWriteAheadSink.java  | 9 +++++++--
 .../runtime/operators/GenericWriteAheadSinkTest.java        | 4 ++--
 4 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index a3d002e..62116ff 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -95,7 +95,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite
 	}
 
 	@Override
-	protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
+	protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception {
 		final AtomicInteger updatesCount = new AtomicInteger(0);
 		final AtomicInteger updatesConfirmed = new AtomicInteger(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
index 847d1a0..2ccba9f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -113,14 +113,14 @@ public class CassandraTupleWriteAheadSinkTest {
 			cc
 		);
 
-		OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink);
+		OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness<>(sink);
 		harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true);
 
 		harness.setup();
 		sink.open();
 
 		// we should leave the loop and return false since we've seen an exception
-		assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L));
+		assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 1L, 0L));
 
 		sink.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 35f420a..1ccaff9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -227,6 +227,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 																in),
 														serializer),
 												serializer),
+										checkpointId,
 										timestamp);
 								if (success) {
 									// in case the checkpoint was successfully committed,
@@ -255,11 +256,15 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	/**
 	 * Write the given element into the backend.
 	 *
-	 * @param value value to be written
+	 * @param values The values to be written
+	 * @param checkpointId The checkpoint ID of the checkpoint to be written
+	 * @param timestamp The wall-clock timestamp of the checkpoint
+	 *
 	 * @return true, if the sending was successful, false otherwise
+	 *
 	 * @throws Exception
 	 */
-	protected abstract boolean sendValues(Iterable<IN> value, long timestamp) throws Exception;
+	protected abstract boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 9bcd2e6..47babfe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -174,7 +174,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		}
 
 		@Override
-		protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+		protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
 			for (Tuple1<Integer> value : values) {
 				this.values.add(value.f0);
 			}
@@ -224,7 +224,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		}
 
 		@Override
-		protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+		protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
 			for (Tuple1<Integer> value : values) {
 				this.values.add(value.f0);
 			}