You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/10 10:09:32 UTC

flink git commit: [FLINK-4500] CassandraSinkBase implements CheckpointedFunction

Repository: flink
Updated Branches:
  refs/heads/master de58523b5 -> 775d7fed1


[FLINK-4500] CassandraSinkBase implements CheckpointedFunction

This closes #4605.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/775d7fed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/775d7fed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/775d7fed

Branch: refs/heads/master
Commit: 775d7fed1ac8230c92997ead3c702004679614a4
Parents: de58523
Author: Michael Fong <mc...@gmail.com>
Authored: Mon Aug 14 20:57:06 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Nov 10 11:09:13 2017 +0100

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraSinkBase.java |  59 +++--
 .../cassandra/CassandraSinkBaseTest.java        | 248 +++++++++++++++++++
 .../connectors/cassandra/ResultSetFutures.java  | 104 ++++++++
 tools/maven/suppressions.xml                    |   2 +-
 4 files changed, 395 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 5da1f57..7a6efd9 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
 import com.datastax.driver.core.Cluster;
@@ -37,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * @param <IN> Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
+public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 	protected transient Cluster cluster;
 	protected transient Session session;
@@ -86,9 +90,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	@Override
 	public void invoke(IN value) throws Exception {
-		if (exception != null) {
-			throw new IOException("Error while sending value.", exception);
-		}
+		checkAsyncErrors();
 		ListenableFuture<V> result = send(value);
 		updatesPending.incrementAndGet();
 		Futures.addCallback(result, callback);
@@ -99,19 +101,9 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	@Override
 	public void close() throws Exception {
 		try {
-			if (exception != null) {
-				throw new IOException("Error while sending value.", exception);
-			}
-
-			while (updatesPending.get() > 0) {
-				synchronized (updatesPending) {
-					updatesPending.wait();
-				}
-			}
-
-			if (exception != null) {
-				throw new IOException("Error while sending value.", exception);
-			}
+			checkAsyncErrors();
+			waitForPendingUpdates();
+			checkAsyncErrors();
 		} finally {
 			try {
 				if (session != null) {
@@ -129,4 +121,37 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 			}
 		}
 	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+		checkAsyncErrors();
+		waitForPendingUpdates();
+		checkAsyncErrors();
+	}
+
+	private void waitForPendingUpdates() throws InterruptedException {
+		while (updatesPending.get() > 0) {
+			synchronized (updatesPending) {
+				updatesPending.wait();
+			}
+		}
+	}
+
+	private void checkAsyncErrors() throws Exception {
+		Throwable error = exception;
+		if (error != null) {
+			// prevent throwing duplicated error
+			exception = null;
+			throw new IOException("Error while sending value.", error);
+		}
+	}
+
+	@VisibleForTesting
+	int getNumOfPendingRecords() {
+		return updatesPending.get();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
new file mode 100644
index 0000000..8c2b367
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.queryablestate.FutureUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+	@Test(expected = NoHostAvailableException.class)
+	public void testHostNotFoundErrorHandling() throws Exception {
+		CassandraSinkBase base = new CassandraSinkBase(new ClusterBuilder() {
+			@Override
+			protected Cluster buildCluster(Cluster.Builder builder) {
+				return builder
+					.addContactPoint("127.0.0.1")
+					.withoutJMXReporting()
+					.withoutMetrics().build();
+			}
+		}) {
+			@Override
+			public ListenableFuture send(Object value) {
+				return null;
+			}
+		};
+
+		base.open(new Configuration());
+	}
+
+	@Test(timeout = 5000)
+	public void testSuccessfulPath() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+		casSinkFunc.open(new Configuration());
+
+		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
+		casSinkFunc.invoke("hello");
+
+		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+		casSinkFunc.close();
+	}
+
+	@Test(timeout = 5000)
+	public void testThrowErrorOnClose() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+		casSinkFunc.open(new Configuration());
+
+		Exception cause = new RuntimeException();
+		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+		casSinkFunc.invoke("hello");
+		try {
+			casSinkFunc.close();
+
+			Assert.fail("Close should have thrown an exception.");
+		} catch (IOException e) {
+			Assert.assertEquals(cause, e.getCause());
+			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+		}
+	}
+
+	@Test(timeout = 5000)
+	public void testThrowErrorOnInvoke() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+		casSinkFunc.open(new Configuration());
+
+		Exception cause = new RuntimeException();
+		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+
+		casSinkFunc.invoke("hello");
+
+		try {
+			casSinkFunc.invoke("world");
+			Assert.fail("Sending of second value should have failed.");
+		} catch (IOException e) {
+			Assert.assertEquals(cause, e.getCause());
+			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+		}
+	}
+
+	@Test(timeout = 5000)
+	public void testThrowErrorOnSnapshot() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+		testHarness.open();
+
+		Exception cause = new RuntimeException();
+		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+
+		casSinkFunc.invoke("hello");
+
+		try {
+			testHarness.snapshot(123L, 123L);
+
+			Assert.fail();
+		} catch (Exception e) {
+			Assert.assertTrue(e.getCause() instanceof IOException);
+			Assert.assertEquals(cause, e.getCause().getCause());
+			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+		}
+
+		testHarness.close();
+	}
+
+	@Test(timeout = 5000)
+	public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+		testHarness.open();
+
+		CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+		ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
+		casSinkFunc.setResultFuture(resultSetFuture);
+
+		casSinkFunc.invoke("hello");
+		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+
+		Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+			@Override
+			public void go() throws Exception {
+				testHarness.snapshot(123L, 123L);
+			}
+		};
+		t.start();
+		while (t.getState() != Thread.State.WAITING) {
+			Thread.sleep(5);
+		}
+
+		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+		completableFuture.complete(null);
+		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+		testHarness.close();
+	}
+
+	@Test(timeout = 5000)
+	public void testWaitForPendingUpdatesOnClose() throws Exception {
+		TestCassandraSink casSinkFunc = new TestCassandraSink();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+
+		testHarness.open();
+
+		CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+		ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
+		casSinkFunc.setResultFuture(resultSetFuture);
+
+		casSinkFunc.invoke("hello");
+		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+
+		Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+			@Override
+			public void go() throws Exception {
+				testHarness.close();
+			}
+		};
+		t.start();
+		while (t.getState() != Thread.State.WAITING) {
+			Thread.sleep(5);
+		}
+
+		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+		completableFuture.complete(null);
+		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+	}
+
+	private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> {
+
+		private static final ClusterBuilder builder;
+		private static final Cluster cluster;
+		private static final Session session;
+
+		static {
+			cluster = mock(Cluster.class);
+
+			session = mock(Session.class);
+			when(cluster.connect()).thenReturn(session);
+
+			builder = new ClusterBuilder() {
+				@Override
+				protected Cluster buildCluster(Cluster.Builder builder) {
+					return cluster;
+				}
+			};
+		}
+
+		private ResultSetFuture result;
+
+		TestCassandraSink() {
+			super(builder);
+		}
+
+		void setResultFuture(ResultSetFuture result) {
+			Preconditions.checkNotNull(result);
+			this.result = result;
+		}
+
+		@Override
+		public ListenableFuture<ResultSet> send(String value) {
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
new file mode 100644
index 0000000..20b80ec
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/ResultSetFutures.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to create {@link com.datastax.driver.core.ResultSetFuture}s.
+ */
+class ResultSetFutures {
+
+	private ResultSetFutures() {
+	}
+
+	static ResultSetFuture fromCompletableFuture(CompletableFuture<ResultSet> future) {
+		checkNotNull(future);
+		return new CompletableResultSetFuture(future);
+	}
+
+	private static class CompletableResultSetFuture implements ResultSetFuture {
+
+		private final CompletableFuture<ResultSet> completableFuture;
+
+		CompletableResultSetFuture(CompletableFuture<ResultSet> future) {
+			this.completableFuture = future;
+		}
+
+		@Override
+		public ResultSet getUninterruptibly() {
+			try {
+				return completableFuture.get();
+			} catch (InterruptedException e) {
+				return getUninterruptibly();
+			} catch (ExecutionException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public ResultSet getUninterruptibly(long l, TimeUnit timeUnit) throws TimeoutException {
+			try {
+				return completableFuture.get(l, timeUnit);
+			} catch (InterruptedException e) {
+				return getUninterruptibly();
+			} catch (ExecutionException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public boolean cancel(boolean b) {
+			return completableFuture.cancel(b);
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return completableFuture.isCancelled();
+		}
+
+		@Override
+		public boolean isDone() {
+			return completableFuture.isDone();
+		}
+
+		@Override
+		public ResultSet get() throws InterruptedException, ExecutionException {
+			return completableFuture.get();
+		}
+
+		@Override
+		public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			return completableFuture.get(timeout, unit);
+		}
+
+		@Override
+		public void addListener(Runnable listener, Executor executor) {
+			completableFuture.whenComplete((result, error) -> listener.run());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/775d7fed/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index a58e17c..19ec17a 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -29,7 +29,7 @@ under the License.
 
 		<!-- Cassandra connectors have to use guava directly -->
 		<suppress
-			files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
+			files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
 			checks="IllegalImport"/>
 		<!-- Kinesis producer has to use guava directly -->
 		<suppress