You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/10/25 11:53:02 UTC

[1/3] flink git commit: [FLINK-7637] [kinesis] Fix at-least-once guarantee in FlinkKinesisProducer

Repository: flink
Updated Branches:
  refs/heads/master 9cd8f0595 -> 12b7fc110


[FLINK-7637] [kinesis] Fix at-least-once guarantee in FlinkKinesisProducer

Prior to this commit, there is no flushing of KPL outstanding records on
checkpoints in the FlinkKinesisProducer. Likewise to the at-least-once
issue on the Flink Kafka producer before, this may lead to data loss if
there are asynchronous failing records after a checkpoint which the
records was part of was completed.

This closes #4871.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/073b82c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/073b82c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/073b82c8

Branch: refs/heads/master
Commit: 073b82c856c0897679d46576ec902c4e5e3a5e32
Parents: 9cd8f05
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:05:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:52:22 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisProducer.java           | 127 +++++--
 .../kinesis/FlinkKinesisProducerTest.java       | 341 +++++++++++++++++--
 .../org/apache/flink/util/ExceptionUtils.java   |  24 ++
 tools/maven/suppressions.xml                    |   2 +-
 4 files changed, 429 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 286de53..2f12e7f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -17,7 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -48,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <OUT> Data type to produce into Kinesis Streams
  */
-public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
+public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
 
@@ -172,13 +176,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 		// check and pass the configuration properties
 		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
-		producer = new KinesisProducer(producerConfig);
+		producer = getKinesisProducer(producerConfig);
 		callback = new FutureCallback<UserRecordResult>() {
 			@Override
 			public void onSuccess(UserRecordResult result) {
 				if (!result.isSuccessful()) {
 					if (failOnError) {
-						thrownException = new RuntimeException("Record was not sent successful");
+						// only remember the first thrown exception
+						if (thrownException == null) {
+							thrownException = new RuntimeException("Record was not sent successful");
+						}
 					} else {
 						LOG.warn("Record was not sent successful");
 					}
@@ -207,23 +214,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 		if (this.producer == null) {
 			throw new RuntimeException("Kinesis producer has been closed");
 		}
-		if (thrownException != null) {
-			String errorMessages = "";
-			if (thrownException instanceof UserRecordFailedException) {
-				List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
-				for (Attempt attempt: attempts) {
-					if (attempt.getErrorMessage() != null) {
-						errorMessages += attempt.getErrorMessage() + "\n";
-					}
-				}
-			}
-			if (failOnError) {
-				throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
-			} else {
-				LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
-				thrownException = null; // reset
-			}
-		}
+
+		checkAndPropagateAsyncError();
 
 		String stream = defaultStream;
 		String partition = defaultPartition;
@@ -260,24 +252,91 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	public void close() throws Exception {
 		LOG.info("Closing producer");
 		super.close();
-		KinesisProducer kp = this.producer;
-		this.producer = null;
-		if (kp != null) {
-			LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
+
+		if (producer != null) {
+			LOG.info("Flushing outstanding {} records", producer.getOutstandingRecordsCount());
 			// try to flush all outstanding records
-			while (kp.getOutstandingRecordsCount() > 0) {
-				kp.flush();
-				try {
-					Thread.sleep(500);
-				} catch (InterruptedException e) {
-					LOG.warn("Flushing was interrupted.");
-					// stop the blocking flushing and destroy producer immediately
-					break;
+			flushSync();
+
+			LOG.info("Flushing done. Destroying producer instance.");
+			producer.destroy();
+			producer = null;
+		}
+
+		// make sure we propagate pending errors
+		checkAndPropagateAsyncError();
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		// nothing to do
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		// check for asynchronous errors and fail the checkpoint if necessary
+		checkAndPropagateAsyncError();
+
+		flushSync();
+		if (producer.getOutstandingRecordsCount() > 0) {
+			throw new IllegalStateException(
+				"Number of outstanding records must be zero at this point: " + producer.getOutstandingRecordsCount());
+		}
+
+		// if the flushed requests has errors, we should propagate it also and fail the checkpoint
+		checkAndPropagateAsyncError();
+	}
+
+	// --------------------------- Utilities ---------------------------
+
+	/**
+	 * Creates a {@link KinesisProducer}.
+	 * Exposed so that tests can inject mock producers easily.
+	 */
+	@VisibleForTesting
+	protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
+		return new KinesisProducer(producerConfig);
+	}
+
+	/**
+	 * Check if there are any asynchronous exceptions. If so, rethrow the exception.
+	 */
+	private void checkAndPropagateAsyncError() throws Exception {
+		if (thrownException != null) {
+			String errorMessages = "";
+			if (thrownException instanceof UserRecordFailedException) {
+				List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
+				for (Attempt attempt: attempts) {
+					if (attempt.getErrorMessage() != null) {
+						errorMessages += attempt.getErrorMessage() + "\n";
+					}
 				}
 			}
-			LOG.info("Flushing done. Destroying producer instance.");
-			kp.destroy();
+			if (failOnError) {
+				throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
+			} else {
+				LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
+
+				// reset, prevent double throwing
+				thrownException = null;
+			}
 		}
 	}
 
+	/**
+	 * A reimplementation of {@link KinesisProducer#flushSync()}.
+	 * This implementation releases the block on flushing if an interruption occurred.
+	 */
+	private void flushSync() throws Exception {
+		while (producer.getOutstandingRecordsCount() > 0) {
+			producer.flush();
+			try {
+				Thread.sleep(500);
+			} catch (InterruptedException e) {
+				LOG.warn("Flushing was interrupted.");
+
+				break;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index ac03cfe..23c3518 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -18,19 +18,43 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Suite of {@link FlinkKinesisProducer} tests.
@@ -49,22 +73,12 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided serialization schema is not serializable");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig);
+		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
 	}
 
 	@Test
 	public void testCreateWithSerializableDeserializer() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig);
+		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
 	}
 
 	@Test
@@ -72,38 +86,191 @@ public class FlinkKinesisProducerTest {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("The provided custom partitioner is not serializable");
 
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
 			.setCustomPartitioner(new NonSerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testConfigureWithSerializableCustomPartitioner() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
 			.setCustomPartitioner(new SerializableCustomPartitioner());
 	}
 
 	@Test
 	public void testConsumerIsSerializable() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
 		assertTrue(InstantiationUtil.isSerializable(consumer));
 	}
 
 	// ----------------------------------------------------------------------
+	// Tests to verify at-least-once guarantee
+	// ----------------------------------------------------------------------
+
+	/**
+	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
+	 */
+	@SuppressWarnings("ResultOfMethodCallIgnored")
+	@Test
+	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+
+		producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception"));
+
+		try {
+			testHarness.processElement(new StreamRecord<>("msg-2"));
+		} catch (Exception e) {
+			// the next invoke should rethrow the async exception
+			Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent());
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
+	 */
+	@SuppressWarnings("ResultOfMethodCallIgnored")
+	@Test
+	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+
+		producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception"));
+
+		try {
+			testHarness.snapshot(123L, 123L);
+		} catch (Exception e) {
+			// the next checkpoint should rethrow the async exception
+			Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent());
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
+	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
+	 *
+	 * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+	 * The test for that is covered in testAtLeastOnceProducer.
+	 */
+	@SuppressWarnings("ResultOfMethodCallIgnored")
+	@Test(timeout = 10000)
+	public void testAsyncErrorRethrownAfterFlush() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+		testHarness.processElement(new StreamRecord<>("msg-2"));
+		testHarness.processElement(new StreamRecord<>("msg-3"));
+
+		// only let the first record succeed for now
+		UserRecordResult result = mock(UserRecordResult.class);
+		when(result.isSuccessful()).thenReturn(true);
+		producer.getPendingRecordFutures().get(0).set(result);
+
+		CheckedThread snapshotThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				// this should block at first, since there are still two pending records that needs to be flushed
+				testHarness.snapshot(123L, 123L);
+			}
+		};
+		snapshotThread.start();
+
+		// let the 2nd message fail with an async exception
+		producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message"));
+		producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
+
+		try {
+			snapshotThread.sync();
+		} catch (Exception e) {
+			// after the flush, the async exception should have been rethrown
+			Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent());
+
+			// test succeeded
+			return;
+		}
+
+		Assert.fail();
+	}
+
+	/**
+	 * Test ensuring that the producer is not dropping buffered records;
+	 * we set a timeout because the test will not finish if the logic is broken.
+	 */
+	@SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
+	@Test(timeout = 10000)
+	public void testAtLeastOnceProducer() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>("msg-1"));
+		testHarness.processElement(new StreamRecord<>("msg-2"));
+		testHarness.processElement(new StreamRecord<>("msg-3"));
+
+		// start a thread to perform checkpointing
+		CheckedThread snapshotThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				// this should block until all records are flushed;
+				// if the snapshot implementation returns before pending records are flushed,
+				testHarness.snapshot(123L, 123L);
+			}
+		};
+		snapshotThread.start();
+
+		// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
+		// this would block forever if the snapshot didn't perform a flush
+		producer.waitUntilFlushStarted();
+		Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+		// now, complete the callbacks
+		UserRecordResult result = mock(UserRecordResult.class);
+		when(result.isSuccessful()).thenReturn(true);
+
+		producer.getPendingRecordFutures().get(0).set(result);
+		Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+		producer.getPendingRecordFutures().get(1).set(result);
+		Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+		producer.getPendingRecordFutures().get(2).set(result);
+
+		// this would fail with an exception if flushing wasn't completed before the snapshot method returned
+		snapshotThread.sync();
+
+		testHarness.close();
+	}
+
+	// ----------------------------------------------------------------------
 	// Utility test classes
 	// ----------------------------------------------------------------------
 
@@ -158,4 +325,118 @@ public class FlinkKinesisProducerTest {
 			return "test-partition";
 		}
 	}
+
+	private static class DummyFlinkKinesisProducer<T> extends FlinkKinesisProducer<T> {
+
+		private static final long serialVersionUID = -1212425318784651817L;
+
+		private static final String DUMMY_STREAM = "dummy-stream";
+		private static final String DUMMY_PARTITION = "dummy-partition";
+
+		private transient KinesisProducer mockProducer;
+		private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
+
+		private transient MultiShotLatch flushLatch;
+		private boolean isFlushed;
+
+		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
+			super(schema, getStandardProperties());
+
+			setDefaultStream(DUMMY_STREAM);
+			setDefaultPartition(DUMMY_PARTITION);
+			setFailOnError(true);
+
+			// set up mock producer
+			this.mockProducer = mock(KinesisProducer.class);
+
+			when(mockProducer.addUserRecord(anyString(), anyString(), anyString(), any(ByteBuffer.class))).thenAnswer(new Answer<Object>() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					SettableFuture<UserRecordResult> future = SettableFuture.create();
+					pendingRecordFutures.add(future);
+					return future;
+				}
+			});
+
+			when(mockProducer.getOutstandingRecordsCount()).thenAnswer(new Answer<Object>() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					return getNumPendingRecordFutures();
+				}
+			});
+
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					flushLatch.trigger();
+
+					while (!isAllRecordFuturesCompleted()) {
+						Thread.sleep(50);
+					}
+
+					isFlushed = true;
+
+					return null;
+				}
+			}).when(mockProducer).flush();
+
+			this.flushLatch = new MultiShotLatch();
+		}
+
+		@Override
+		protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
+			return mockProducer;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			isFlushed = false;
+
+			super.snapshotState(context);
+
+			// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
+			if (!isFlushed) {
+				throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
+			}
+		}
+
+		List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
+			return pendingRecordFutures;
+		}
+
+		void waitUntilFlushStarted() throws Exception {
+			flushLatch.await();
+		}
+
+		private boolean isAllRecordFuturesCompleted() {
+			for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
+				if (!future.isDone()) {
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		private int getNumPendingRecordFutures() {
+			int numPending = 0;
+
+			for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
+				if (!future.isDone()) {
+					numPending++;
+				}
+			}
+
+			return numPending;
+		}
+	}
+
+	private static Properties getStandardProperties() {
+		Properties standardProps = new Properties();
+		standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		return standardProps;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 77d4643..c2a9723 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -302,6 +302,30 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether a throwable chain contains a specific error message and returns the corresponding throwable.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param searchMessage the error message to search for in the chain.
+	 * @return Optional throwable containing the search message if available, otherwise empty
+	 */
+	public static Optional<Throwable> findThrowableWithMessage(Throwable throwable, String searchMessage) {
+		if (throwable == null || searchMessage == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (t.getMessage().contains(searchMessage)) {
+				return Optional.of(t);
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Unpacks an {@link ExecutionException} and returns its cause. Otherwise the given
 	 * Throwable is returned.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index b19435e..d897137 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -34,6 +34,6 @@ under the License.
 			checks="IllegalImport"/>
 		<!-- Kinesis producer has to use guava directly -->
 		<suppress
-			files="FlinkKinesisProducer.java"
+			files="FlinkKinesisProducer.java|FlinkKinesisProducerTest.java"
 			checks="IllegalImport"/>
 </suppressions>


[2/3] flink git commit: [hotfix] [kinesis] Fix inproper test name in FlinkKinesisProducerTest

Posted by tz...@apache.org.
[hotfix] [kinesis] Fix inproper test name in FlinkKinesisProducerTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf5ef62a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf5ef62a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf5ef62a

Branch: refs/heads/master
Commit: bf5ef62a43879d070ff54ebf09cecd2576e63928
Parents: 073b82c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:06:36 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:53:05 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kinesis/FlinkKinesisProducerTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf5ef62a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 23c3518..970116a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -97,7 +97,7 @@ public class FlinkKinesisProducerTest {
 	}
 
 	@Test
-	public void testConsumerIsSerializable() {
+	public void testProducerIsSerializable() {
 		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
 		assertTrue(InstantiationUtil.isSerializable(consumer));
 	}


[3/3] flink git commit: [hotfix] [kinesis] Properly add serialVersionUIDs to FlinkKinesisProducer classes

Posted by tz...@apache.org.
[hotfix] [kinesis] Properly add serialVersionUIDs to FlinkKinesisProducer classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b7fc11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b7fc11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b7fc11

Branch: refs/heads/master
Commit: 12b7fc1109f85561fdbdf93c4655a656b29e8f03
Parents: bf5ef62
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:11:00 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:53:14 2017 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/FlinkKinesisProducer.java        |  2 ++
 .../connectors/kinesis/FlinkKinesisProducerTest.java    | 12 ++++++++++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12b7fc11/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 2f12e7f..28aa4b3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -54,6 +54,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
 
+	private static final long serialVersionUID = 6447077318449477846L;
+
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
 
 	/** Properties to parametrize settings such as AWS service region, access key etc. */

http://git-wip-us.apache.org/repos/asf/flink/blob/12b7fc11/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 970116a..2cd0c17 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -279,6 +279,9 @@ public class FlinkKinesisProducerTest {
 	 * to the enclosing class, which is not serializable) used for testing.
 	 */
 	private final class NonSerializableSerializationSchema implements KinesisSerializationSchema<String> {
+
+		private static final long serialVersionUID = 3361337188490178780L;
+
 		@Override
 		public ByteBuffer serialize(String element) {
 			return ByteBuffer.wrap(element.getBytes());
@@ -294,6 +297,9 @@ public class FlinkKinesisProducerTest {
 	 * A static, serializable {@link KinesisSerializationSchema}.
 	 */
 	private static final class SerializableSerializationSchema implements KinesisSerializationSchema<String> {
+
+		private static final long serialVersionUID = 6298573834520052886L;
+
 		@Override
 		public ByteBuffer serialize(String element) {
 			return ByteBuffer.wrap(element.getBytes());
@@ -310,6 +316,9 @@ public class FlinkKinesisProducerTest {
 	 * to the enclosing class, which is not serializable) used for testing.
 	 */
 	private final class NonSerializableCustomPartitioner extends KinesisPartitioner<String> {
+
+		private static final long serialVersionUID = -5961578876056779161L;
+
 		@Override
 		public String getPartitionId(String element) {
 			return "test-partition";
@@ -320,6 +329,9 @@ public class FlinkKinesisProducerTest {
 	 * A static, serializable {@link KinesisPartitioner}.
 	 */
 	private static final class SerializableCustomPartitioner extends KinesisPartitioner<String> {
+
+		private static final long serialVersionUID = -4996071893997035695L;
+
 		@Override
 		public String getPartitionId(String element) {
 			return "test-partition";