You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/10 10:09:32 UTC
flink git commit: [FLINK-4500] CassandraSinkBase implements
CheckpointedFunction
Repository: flink
Updated Branches:
refs/heads/master de58523b5 -> 775d7fed1
[FLINK-4500] CassandraSinkBase implements CheckpointedFunction
This closes #4605.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/775d7fed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/775d7fed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/775d7fed
Branch: refs/heads/master
Commit: 775d7fed1ac8230c92997ead3c702004679614a4
Parents: de58523
Author: Michael Fong <mc...@gmail.com>
Authored: Mon Aug 14 20:57:06 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Nov 10 11:09:13 2017 +0100
----------------------------------------------------------------------
.../connectors/cassandra/CassandraSinkBase.java | 59 +++--
.../cassandra/CassandraSinkBaseTest.java | 248 +++++++++++++++++++
.../connectors/cassandra/ResultSetFutures.java | 104 ++++++++
tools/maven/suppressions.xml | 2 +-
4 files changed, 395 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 5da1f57..7a6efd9 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -17,8 +17,12 @@
package org.apache.flink.streaming.connectors.cassandra;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.datastax.driver.core.Cluster;
@@ -37,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* @param <IN> Type of the elements emitted by this sink
*/
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
+public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
@@ -86,9 +90,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) throws Exception {
- if (exception != null) {
- throw new IOException("Error while sending value.", exception);
- }
+ checkAsyncErrors();
ListenableFuture<V> result = send(value);
updatesPending.incrementAndGet();
Futures.addCallback(result, callback);
@@ -99,19 +101,9 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
@Override
public void close() throws Exception {
try {
- if (exception != null) {
- throw new IOException("Error while sending value.", exception);
- }
-
- while (updatesPending.get() > 0) {
- synchronized (updatesPending) {
- updatesPending.wait();
- }
- }
-
- if (exception != null) {
- throw new IOException("Error while sending value.", exception);
- }
+ checkAsyncErrors();
+ waitForPendingUpdates();
+ checkAsyncErrors();
} finally {
try {
if (session != null) {
@@ -129,4 +121,37 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
}
}
}
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+ checkAsyncErrors();
+ waitForPendingUpdates();
+ checkAsyncErrors();
+ }
+
+ private void waitForPendingUpdates() throws InterruptedException {
+ while (updatesPending.get() > 0) {
+ synchronized (updatesPending) {
+ updatesPending.wait();
+ }
+ }
+ }
+
+ private void checkAsyncErrors() throws Exception {
+ Throwable error = exception;
+ if (error != null) {
+ // prevent throwing duplicated error
+ exception = null;
+ throw new IOException("Error while sending value.", error);
+ }
+ }
+
+ @VisibleForTesting
+ int getNumOfPendingRecords() {
+ return updatesPending.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
new file mode 100644
index 0000000..8c2b367
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.queryablestate.FutureUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+ @Test(expected = NoHostAvailableException.class)
+ public void testHostNotFoundErrorHandling() throws Exception {
+ CassandraSinkBase base = new CassandraSinkBase(new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder
+ .addContactPoint("127.0.0.1")
+ .withoutJMXReporting()
+ .withoutMetrics().build();
+ }
+ }) {
+ @Override
+ public ListenableFuture send(Object value) {
+ return null;
+ }
+ };
+
+ base.open(new Configuration());
+ }
+
+ @Test(timeout = 5000)
+ public void testSuccessfulPath() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
+ casSinkFunc.invoke("hello");
+
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+ casSinkFunc.close();
+ }
+
+ @Test(timeout = 5000)
+ public void testThrowErrorOnClose() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+ casSinkFunc.open(new Configuration());
+
+ Exception cause = new RuntimeException();
+ casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+ casSinkFunc.invoke("hello");
+ try {
+ casSinkFunc.close();
+
+ Assert.fail("Close should have thrown an exception.");
+ } catch (IOException e) {
+ Assert.assertEquals(cause, e.getCause());
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testThrowErrorOnInvoke() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+ casSinkFunc.open(new Configuration());
+
+ Exception cause = new RuntimeException();
+ casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+
+ casSinkFunc.invoke("hello");
+
+ try {
+ casSinkFunc.invoke("world");
+ Assert.fail("Sending of second value should have failed.");
+ } catch (IOException e) {
+ Assert.assertEquals(cause, e.getCause());
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testThrowErrorOnSnapshot() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ Exception cause = new RuntimeException();
+ casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+
+ casSinkFunc.invoke("hello");
+
+ try {
+ testHarness.snapshot(123L, 123L);
+
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof IOException);
+ Assert.assertEquals(cause, e.getCause().getCause());
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ testHarness.close();
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+ ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
+ casSinkFunc.setResultFuture(resultSetFuture);
+
+ casSinkFunc.invoke("hello");
+ Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+
+ Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+ @Override
+ public void go() throws Exception {
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ t.start();
+ while (t.getState() != Thread.State.WAITING) {
+ Thread.sleep(5);
+ }
+
+ Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+ completableFuture.complete(null);
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+ testHarness.close();
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitForPendingUpdatesOnClose() throws Exception {
+ TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+ ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
+ casSinkFunc.setResultFuture(resultSetFuture);
+
+ casSinkFunc.invoke("hello");
+ Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+
+ Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+ @Override
+ public void go() throws Exception {
+ testHarness.close();
+ }
+ };
+ t.start();
+ while (t.getState() != Thread.State.WAITING) {
+ Thread.sleep(5);
+ }
+
+ Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+ completableFuture.complete(null);
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> {
+
+ private static final ClusterBuilder builder;
+ private static final Cluster cluster;
+ private static final Session session;
+
+ static {
+ cluster = mock(Cluster.class);
+
+ session = mock(Session.class);
+ when(cluster.connect()).thenReturn(session);
+
+ builder = new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return cluster;
+ }
+ };
+ }
+
+ private ResultSetFuture result;
+
+ TestCassandraSink() {
+ super(builder);
+ }
+
+ void setResultFuture(ResultSetFuture result) {
+ Preconditions.checkNotNull(result);
+ this.result = result;
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(String value) {
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
new file mode 100644
index 0000000..20b80ec
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to create {@link com.datastax.driver.core.ResultSetFuture}s.
+ */
+class ResultSetFutures {
+
+ private ResultSetFutures() {
+ }
+
+ static ResultSetFuture fromCompletableFuture(CompletableFuture<ResultSet> future) {
+ checkNotNull(future);
+ return new CompletableResultSetFuture(future);
+ }
+
+ private static class CompletableResultSetFuture implements ResultSetFuture {
+
+ private final CompletableFuture<ResultSet> completableFuture;
+
+ CompletableResultSetFuture(CompletableFuture<ResultSet> future) {
+ this.completableFuture = future;
+ }
+
+ @Override
+ public ResultSet getUninterruptibly() {
+ try {
+ return completableFuture.get();
+ } catch (InterruptedException e) {
+ return getUninterruptibly();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ResultSet getUninterruptibly(long l, TimeUnit timeUnit) throws TimeoutException {
+ try {
+ return completableFuture.get(l, timeUnit);
+ } catch (InterruptedException e) {
+ return getUninterruptibly();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean b) {
+ return completableFuture.cancel(b);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return completableFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return completableFuture.isDone();
+ }
+
+ @Override
+ public ResultSet get() throws InterruptedException, ExecutionException {
+ return completableFuture.get();
+ }
+
+ @Override
+ public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return completableFuture.get(timeout, unit);
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ completableFuture.whenComplete((result, error) -> listener.run());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index a58e17c..19ec17a 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -29,7 +29,7 @@ under the License.
<!-- Cassandra connectors have to use guava directly -->
<suppress
- files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
+ files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
checks="IllegalImport"/>
<!-- Kinesis producer has to use guava directly -->
<suppress