You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/11 17:53:38 UTC
flink git commit: [FLINK-7149] [connectors] Add checkpoint ID to
'sendValues()' in GenericWriteAheadSink
Repository: flink
Updated Branches:
refs/heads/release-1.3 ad8766e10 -> faf797456
[FLINK-7149] [connectors] Add checkpoint ID to 'sendValues()' in GenericWriteAheadSink
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/faf79745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/faf79745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/faf79745
Branch: refs/heads/release-1.3
Commit: faf79745659130b6da61034e275abbb3bcb3371c
Parents: ad8766e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 11 12:46:31 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 11 19:52:53 2017 +0200
----------------------------------------------------------------------
.../connectors/cassandra/CassandraTupleWriteAheadSink.java | 2 +-
.../cassandra/CassandraTupleWriteAheadSinkTest.java | 4 ++--
.../streaming/runtime/operators/GenericWriteAheadSink.java | 9 +++++++--
.../runtime/operators/GenericWriteAheadSinkTest.java | 4 ++--
4 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index a3d002e..62116ff 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -95,7 +95,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite
}
@Override
- protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
+ protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception {
final AtomicInteger updatesCount = new AtomicInteger(0);
final AtomicInteger updatesConfirmed = new AtomicInteger(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
index 847d1a0..2ccba9f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -113,14 +113,14 @@ public class CassandraTupleWriteAheadSinkTest {
cc
);
- OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink);
+ OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness<>(sink);
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true);
harness.setup();
sink.open();
// we should leave the loop and return false since we've seen an exception
- assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L));
+ assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 1L, 0L));
sink.close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 35f420a..1ccaff9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -227,6 +227,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
in),
serializer),
serializer),
+ checkpointId,
timestamp);
if (success) {
// in case the checkpoint was successfully committed,
@@ -255,11 +256,15 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
/**
* Write the given element into the backend.
*
- * @param value value to be written
+ * @param values The values to be written
+ * @param checkpointId The checkpoint ID of the checkpoint to be written
+ * @param timestamp The wall-clock timestamp of the checkpoint
+ *
* @return true, if the sending was successful, false otherwise
+ *
* @throws Exception
*/
- protected abstract boolean sendValues(Iterable<IN> value, long timestamp) throws Exception;
+ protected abstract boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/faf79745/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 9bcd2e6..47babfe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -174,7 +174,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
}
@Override
- protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+ protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
for (Tuple1<Integer> value : values) {
this.values.add(value.f0);
}
@@ -224,7 +224,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
}
@Override
- protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
+ protected boolean sendValues(Iterable<Tuple1<Integer>> values, long checkpointId, long timestamp) throws Exception {
for (Tuple1<Integer> value : values) {
this.values.add(value.f0);
}