You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/10/25 11:53:02 UTC
[1/3] flink git commit: [FLINK-7637] [kinesis] Fix at-least-once
guarantee in FlinkKinesisProducer
Repository: flink
Updated Branches:
refs/heads/master 9cd8f0595 -> 12b7fc110
[FLINK-7637] [kinesis] Fix at-least-once guarantee in FlinkKinesisProducer
Prior to this commit, there is no flushing of KPL outstanding records on
checkpoints in the FlinkKinesisProducer. Likewise to the at-least-once
issue on the Flink Kafka producer before, this may lead to data loss if
there are asynchronous failing records after a checkpoint which the
records was part of was completed.
This closes #4871.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/073b82c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/073b82c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/073b82c8
Branch: refs/heads/master
Commit: 073b82c856c0897679d46576ec902c4e5e3a5e32
Parents: 9cd8f05
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:05:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:52:22 2017 +0800
----------------------------------------------------------------------
.../kinesis/FlinkKinesisProducer.java | 127 +++++--
.../kinesis/FlinkKinesisProducerTest.java | 341 +++++++++++++++++--
.../org/apache/flink/util/ExceptionUtils.java | 24 ++
tools/maven/suppressions.xml | 2 +-
4 files changed, 429 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 286de53..2f12e7f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -17,7 +17,11 @@
package org.apache.flink.streaming.connectors.kinesis;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -48,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* @param <OUT> Data type to produce into Kinesis Streams
*/
-public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
+public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -172,13 +176,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
// check and pass the configuration properties
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
- producer = new KinesisProducer(producerConfig);
+ producer = getKinesisProducer(producerConfig);
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
if (!result.isSuccessful()) {
if (failOnError) {
- thrownException = new RuntimeException("Record was not sent successful");
+ // only remember the first thrown exception
+ if (thrownException == null) {
+ thrownException = new RuntimeException("Record was not sent successful");
+ }
} else {
LOG.warn("Record was not sent successful");
}
@@ -207,23 +214,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
if (this.producer == null) {
throw new RuntimeException("Kinesis producer has been closed");
}
- if (thrownException != null) {
- String errorMessages = "";
- if (thrownException instanceof UserRecordFailedException) {
- List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
- for (Attempt attempt: attempts) {
- if (attempt.getErrorMessage() != null) {
- errorMessages += attempt.getErrorMessage() + "\n";
- }
- }
- }
- if (failOnError) {
- throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
- } else {
- LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
- thrownException = null; // reset
- }
- }
+
+ checkAndPropagateAsyncError();
String stream = defaultStream;
String partition = defaultPartition;
@@ -260,24 +252,91 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
public void close() throws Exception {
LOG.info("Closing producer");
super.close();
- KinesisProducer kp = this.producer;
- this.producer = null;
- if (kp != null) {
- LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
+
+ if (producer != null) {
+ LOG.info("Flushing outstanding {} records", producer.getOutstandingRecordsCount());
// try to flush all outstanding records
- while (kp.getOutstandingRecordsCount() > 0) {
- kp.flush();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- LOG.warn("Flushing was interrupted.");
- // stop the blocking flushing and destroy producer immediately
- break;
+ flushSync();
+
+ LOG.info("Flushing done. Destroying producer instance.");
+ producer.destroy();
+ producer = null;
+ }
+
+ // make sure we propagate pending errors
+ checkAndPropagateAsyncError();
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // nothing to do
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // check for asynchronous errors and fail the checkpoint if necessary
+ checkAndPropagateAsyncError();
+
+ flushSync();
+ if (producer.getOutstandingRecordsCount() > 0) {
+ throw new IllegalStateException(
+ "Number of outstanding records must be zero at this point: " + producer.getOutstandingRecordsCount());
+ }
+
+ // if the flushed requests has errors, we should propagate it also and fail the checkpoint
+ checkAndPropagateAsyncError();
+ }
+
+ // --------------------------- Utilities ---------------------------
+
+ /**
+ * Creates a {@link KinesisProducer}.
+ * Exposed so that tests can inject mock producers easily.
+ */
+ @VisibleForTesting
+ protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
+ return new KinesisProducer(producerConfig);
+ }
+
+ /**
+ * Check if there are any asynchronous exceptions. If so, rethrow the exception.
+ */
+ private void checkAndPropagateAsyncError() throws Exception {
+ if (thrownException != null) {
+ String errorMessages = "";
+ if (thrownException instanceof UserRecordFailedException) {
+ List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
+ for (Attempt attempt: attempts) {
+ if (attempt.getErrorMessage() != null) {
+ errorMessages += attempt.getErrorMessage() + "\n";
+ }
}
}
- LOG.info("Flushing done. Destroying producer instance.");
- kp.destroy();
+ if (failOnError) {
+ throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
+ } else {
+ LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
+
+ // reset, prevent double throwing
+ thrownException = null;
+ }
}
}
+ /**
+ * A reimplementation of {@link KinesisProducer#flushSync()}.
+ * This implementation releases the block on flushing if an interruption occurred.
+ */
+ private void flushSync() throws Exception {
+ while (producer.getOutstandingRecordsCount() > 0) {
+ producer.flush();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.warn("Flushing was interrupted.");
+
+ break;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index ac03cfe..23c3518 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -18,19 +18,43 @@
package org.apache.flink.streaming.connectors.kinesis;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Suite of {@link FlinkKinesisProducer} tests.
@@ -49,22 +73,12 @@ public class FlinkKinesisProducerTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided serialization schema is not serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
}
@Test
public void testCreateWithSerializableDeserializer() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig);
+ new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
}
@Test
@@ -72,38 +86,191 @@ public class FlinkKinesisProducerTest {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The provided custom partitioner is not serializable");
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
.setCustomPartitioner(new NonSerializableCustomPartitioner());
}
@Test
public void testConfigureWithSerializableCustomPartitioner() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
+ new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
.setCustomPartitioner(new SerializableCustomPartitioner());
}
@Test
public void testConsumerIsSerializable() {
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
+ FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(consumer));
}
// ----------------------------------------------------------------------
+ // Tests to verify at-least-once guarantee
+ // ----------------------------------------------------------------------
+
+ /**
+ * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next checkpoint should rethrow the async exception
+ Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
+ *
+ * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+ * The test for that is covered in testAtLeastOnceProducer.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test(timeout = 10000)
+ public void testAsyncErrorRethrownAfterFlush() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ // only let the first record succeed for now
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+ producer.getPendingRecordFutures().get(0).set(result);
+
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block at first, since there are still two pending records that needs to be flushed
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ snapshotThread.start();
+
+ // let the 2nd message fail with an async exception
+ producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message"));
+ producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
+
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // after the flush, the async exception should have been rethrown
+ Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent());
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that the producer is not dropping buffered records;
+ * we set a timeout because the test will not finish if the logic is broken.
+ */
+ @SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
+ @Test(timeout = 10000)
+ public void testAtLeastOnceProducer() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ // start a thread to perform checkpointing
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block until all records are flushed;
+ // if the snapshot implementation returns before pending records are flushed,
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ snapshotThread.start();
+
+ // before proceeding, make sure that flushing has started and that the snapshot is still blocked;
+ // this would block forever if the snapshot didn't perform a flush
+ producer.waitUntilFlushStarted();
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+ // now, complete the callbacks
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+
+ producer.getPendingRecordFutures().get(0).set(result);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+ producer.getPendingRecordFutures().get(1).set(result);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+ producer.getPendingRecordFutures().get(2).set(result);
+
+ // this would fail with an exception if flushing wasn't completed before the snapshot method returned
+ snapshotThread.sync();
+
+ testHarness.close();
+ }
+
+ // ----------------------------------------------------------------------
// Utility test classes
// ----------------------------------------------------------------------
@@ -158,4 +325,118 @@ public class FlinkKinesisProducerTest {
return "test-partition";
}
}
+
+ private static class DummyFlinkKinesisProducer<T> extends FlinkKinesisProducer<T> {
+
+ private static final long serialVersionUID = -1212425318784651817L;
+
+ private static final String DUMMY_STREAM = "dummy-stream";
+ private static final String DUMMY_PARTITION = "dummy-partition";
+
+ private transient KinesisProducer mockProducer;
+ private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
+
+ private transient MultiShotLatch flushLatch;
+ private boolean isFlushed;
+
+ DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
+ super(schema, getStandardProperties());
+
+ setDefaultStream(DUMMY_STREAM);
+ setDefaultPartition(DUMMY_PARTITION);
+ setFailOnError(true);
+
+ // set up mock producer
+ this.mockProducer = mock(KinesisProducer.class);
+
+ when(mockProducer.addUserRecord(anyString(), anyString(), anyString(), any(ByteBuffer.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ SettableFuture<UserRecordResult> future = SettableFuture.create();
+ pendingRecordFutures.add(future);
+ return future;
+ }
+ });
+
+ when(mockProducer.getOutstandingRecordsCount()).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return getNumPendingRecordFutures();
+ }
+ });
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ flushLatch.trigger();
+
+ while (!isAllRecordFuturesCompleted()) {
+ Thread.sleep(50);
+ }
+
+ isFlushed = true;
+
+ return null;
+ }
+ }).when(mockProducer).flush();
+
+ this.flushLatch = new MultiShotLatch();
+ }
+
+ @Override
+ protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
+ return mockProducer;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ isFlushed = false;
+
+ super.snapshotState(context);
+
+ // if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
+ if (!isFlushed) {
+ throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
+ }
+ }
+
+ List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
+ return pendingRecordFutures;
+ }
+
+ void waitUntilFlushStarted() throws Exception {
+ flushLatch.await();
+ }
+
+ private boolean isAllRecordFuturesCompleted() {
+ for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
+ if (!future.isDone()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private int getNumPendingRecordFutures() {
+ int numPending = 0;
+
+ for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
+ if (!future.isDone()) {
+ numPending++;
+ }
+ }
+
+ return numPending;
+ }
+ }
+
+ private static Properties getStandardProperties() {
+ Properties standardProps = new Properties();
+ standardProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ standardProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ standardProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ return standardProps;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 77d4643..c2a9723 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -302,6 +302,30 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether a throwable chain contains a specific error message and returns the corresponding throwable.
+ *
+ * @param throwable the throwable chain to check.
+ * @param searchMessage the error message to search for in the chain.
+ * @return Optional throwable containing the search message if available, otherwise empty
+ */
+ public static Optional<Throwable> findThrowableWithMessage(Throwable throwable, String searchMessage) {
+ if (throwable == null || searchMessage == null) {
+ return Optional.empty();
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ if (t.getMessage().contains(searchMessage)) {
+ return Optional.of(t);
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
* Unpacks an {@link ExecutionException} and returns its cause. Otherwise the given
* Throwable is returned.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/073b82c8/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index b19435e..d897137 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -34,6 +34,6 @@ under the License.
checks="IllegalImport"/>
<!-- Kinesis producer has to use guava directly -->
<suppress
- files="FlinkKinesisProducer.java"
+ files="FlinkKinesisProducer.java|FlinkKinesisProducerTest.java"
checks="IllegalImport"/>
</suppressions>
[2/3] flink git commit: [hotfix] [kinesis] Fix inproper test name in
FlinkKinesisProducerTest
Posted by tz...@apache.org.
[hotfix] [kinesis] Fix inproper test name in FlinkKinesisProducerTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf5ef62a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf5ef62a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf5ef62a
Branch: refs/heads/master
Commit: bf5ef62a43879d070ff54ebf09cecd2576e63928
Parents: 073b82c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:06:36 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:53:05 2017 +0800
----------------------------------------------------------------------
.../streaming/connectors/kinesis/FlinkKinesisProducerTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf5ef62a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 23c3518..970116a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -97,7 +97,7 @@ public class FlinkKinesisProducerTest {
}
@Test
- public void testConsumerIsSerializable() {
+ public void testProducerIsSerializable() {
FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
assertTrue(InstantiationUtil.isSerializable(consumer));
}
[3/3] flink git commit: [hotfix] [kinesis] Properly add
serialVersionUIDs to FlinkKinesisProducer classes
Posted by tz...@apache.org.
[hotfix] [kinesis] Properly add serialVersionUIDs to FlinkKinesisProducer classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b7fc11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b7fc11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b7fc11
Branch: refs/heads/master
Commit: 12b7fc1109f85561fdbdf93c4655a656b29e8f03
Parents: bf5ef62
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 20 17:11:00 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Oct 25 18:53:14 2017 +0800
----------------------------------------------------------------------
.../connectors/kinesis/FlinkKinesisProducer.java | 2 ++
.../connectors/kinesis/FlinkKinesisProducerTest.java | 12 ++++++++++++
2 files changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/12b7fc11/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 2f12e7f..28aa4b3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -54,6 +54,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+ private static final long serialVersionUID = 6447077318449477846L;
+
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
/** Properties to parametrize settings such as AWS service region, access key etc. */
http://git-wip-us.apache.org/repos/asf/flink/blob/12b7fc11/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 970116a..2cd0c17 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -279,6 +279,9 @@ public class FlinkKinesisProducerTest {
* to the enclosing class, which is not serializable) used for testing.
*/
private final class NonSerializableSerializationSchema implements KinesisSerializationSchema<String> {
+
+ private static final long serialVersionUID = 3361337188490178780L;
+
@Override
public ByteBuffer serialize(String element) {
return ByteBuffer.wrap(element.getBytes());
@@ -294,6 +297,9 @@ public class FlinkKinesisProducerTest {
* A static, serializable {@link KinesisSerializationSchema}.
*/
private static final class SerializableSerializationSchema implements KinesisSerializationSchema<String> {
+
+ private static final long serialVersionUID = 6298573834520052886L;
+
@Override
public ByteBuffer serialize(String element) {
return ByteBuffer.wrap(element.getBytes());
@@ -310,6 +316,9 @@ public class FlinkKinesisProducerTest {
* to the enclosing class, which is not serializable) used for testing.
*/
private final class NonSerializableCustomPartitioner extends KinesisPartitioner<String> {
+
+ private static final long serialVersionUID = -5961578876056779161L;
+
@Override
public String getPartitionId(String element) {
return "test-partition";
@@ -320,6 +329,9 @@ public class FlinkKinesisProducerTest {
* A static, serializable {@link KinesisPartitioner}.
*/
private static final class SerializableCustomPartitioner extends KinesisPartitioner<String> {
+
+ private static final long serialVersionUID = -4996071893997035695L;
+
@Override
public String getPartitionId(String element) {
return "test-partition";