You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/06/28 23:07:10 UTC
[beam] 01/03: [BEAM-7589] Use only one KinesisProducer instance per
JVM
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 43bb514b6a400a71f2f3ca2e7f2ddc7cdfd8e459
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Thu Jun 27 18:45:36 2019 +0200
[BEAM-7589] Use only one KinesisProducer instance per JVM
---
.../org/apache/beam/sdk/io/kinesis/KinesisIO.java | 144 ++++++++++-----------
.../beam/sdk/io/kinesis/KinesisMockWriteTest.java | 20 ++-
.../beam/sdk/io/kinesis/KinesisProducerMock.java | 39 +++---
3 files changed, 95 insertions(+), 108 deletions(-)
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 7a5cb52..6e1d951 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -29,13 +29,15 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.auto.value.AutoValue;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -587,20 +589,35 @@ public final class KinesisIO {
private static class KinesisWriterFn extends DoFn<byte[], Void> {
- private static final int MAX_NUM_RECORDS = 100 * 1000;
private static final int MAX_NUM_FAILURES = 10;
private final KinesisIO.Write spec;
- private transient IKinesisProducer producer;
+ private static transient IKinesisProducer producer;
private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
+ private transient List<Future<UserRecordResult>> putFutures;
- public KinesisWriterFn(KinesisIO.Write spec) {
+ KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
+ initKinesisProducer();
}
@Setup
- public void setup() throws Exception {
+ public void setup() {
+ // Use custom partitioner if it exists
+ if (spec.getPartitioner() != null) {
+ partitioner = spec.getPartitioner();
+ }
+ }
+
+ @StartBundle
+ public void startBundle() {
+ putFutures = Collections.synchronizedList(new ArrayList<>());
+ /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
+ failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
+ }
+
+ private synchronized void initKinesisProducer() {
// Init producer config
Properties props = spec.getProducerProperties();
if (props == null) {
@@ -614,13 +631,6 @@ public final class KinesisIO {
// Init Kinesis producer
producer = spec.getAWSClientsProvider().createKinesisProducer(config);
- // Use custom partitioner if it exists
- if (spec.getPartitioner() != null) {
- partitioner = spec.getPartitioner();
- }
-
- /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
- failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
}
/**
@@ -638,13 +648,7 @@ public final class KinesisIO {
* the KPL</a>
*/
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- checkForFailures();
-
- // Need to avoid keeping too many futures in producer's map to prevent OOM.
- // In usual case, it should exit immediately.
- flush(MAX_NUM_RECORDS);
-
+ public void processElement(ProcessContext c) {
ByteBuffer data = ByteBuffer.wrap(c.element());
String partitionKey = spec.getPartitionKey();
String explicitHashKey = null;
@@ -657,73 +661,77 @@ public final class KinesisIO {
ListenableFuture<UserRecordResult> f =
producer.addUserRecord(spec.getStreamName(), partitionKey, explicitHashKey, data);
- Futures.addCallback(f, new UserRecordResultFutureCallback());
+ putFutures.add(f);
}
@FinishBundle
public void finishBundle() throws Exception {
- // Flush all outstanding records, blocking call
- flushAll();
-
- checkForFailures();
- }
-
- @Teardown
- public void tearDown() throws Exception {
- if (producer != null) {
- producer.destroy();
- producer = null;
- }
+ flushBundle();
}
/**
- * Flush outstanding records until the total number will be less than required or the number
- * of retries will be exhausted. The retry timeout starts from 1 second and it doubles on
- * every iteration.
+ * Flush outstanding records until the total number of failed records will be less than 0 or
+ * the number of retries will be exhausted. The retry timeout starts from 1 second and it
+ * doubles on every iteration.
*/
- private void flush(int numMax) throws InterruptedException, IOException {
+ private void flushBundle() throws InterruptedException, ExecutionException, IOException {
int retries = spec.getRetries();
- int numOutstandingRecords = producer.getOutstandingRecordsCount();
+ int numFailedRecords;
int retryTimeout = 1000; // initial timeout, 1 sec
+ String message = "";
- while (numOutstandingRecords > numMax && retries-- > 0) {
+ do {
+ numFailedRecords = 0;
producer.flush();
+
+ // Wait for puts to finish and check the results
+ for (Future<UserRecordResult> f : putFutures) {
+ UserRecordResult result = f.get(); // this does block
+ if (!result.isSuccessful()) {
+ numFailedRecords++;
+ }
+ }
+
// wait until outstanding records will be flushed
Thread.sleep(retryTimeout);
- numOutstandingRecords = producer.getOutstandingRecordsCount();
retryTimeout *= 2; // exponential backoff
- }
+ } while (numFailedRecords > 0 && retries-- > 0);
+
+ if (numFailedRecords > 0) {
+ for (Future<UserRecordResult> f : putFutures) {
+ UserRecordResult result = f.get();
+ if (!result.isSuccessful()) {
+ failures.offer(
+ new KinesisWriteException(
+ "Put record was not successful.", new UserRecordFailedException(result)));
+ }
+ }
- if (numOutstandingRecords > numMax) {
- String message =
+ message =
String.format(
- "After [%d] retries, number of outstanding records [%d] is still greater than "
- + "required [%d].",
- spec.getRetries(), numOutstandingRecords, numMax);
+ "After [%d] retries, number of failed records [%d] is still greater than 0",
+ spec.getRetries(), numFailedRecords);
LOG.error(message);
- throw new IOException(message);
}
- }
- private void flushAll() throws InterruptedException, IOException {
- flush(0);
+ checkForFailures(message);
}
/** If any write has asynchronously failed, fail the bundle with a useful error. */
- private void checkForFailures() throws IOException {
- // Note that this function is never called by multiple threads and is the only place that
- // we remove from failures, so this code is safe.
+ private void checkForFailures(String message) throws IOException {
if (failures.isEmpty()) {
return;
}
StringBuilder logEntry = new StringBuilder();
+ logEntry.append(message).append(System.lineSeparator());
+
int i = 0;
while (!failures.isEmpty()) {
i++;
KinesisWriteException exc = failures.remove();
- logEntry.append("\n").append(exc.getMessage());
+ logEntry.append(System.lineSeparator()).append(exc.getMessage());
Throwable cause = exc.getCause();
if (cause != null) {
logEntry.append(": ").append(cause.getMessage());
@@ -733,36 +741,18 @@ public final class KinesisIO {
((UserRecordFailedException) cause).getResult().getAttempts();
for (Attempt attempt : attempts) {
if (attempt.getErrorMessage() != null) {
- logEntry.append("\n").append(attempt.getErrorMessage());
+ logEntry.append(System.lineSeparator()).append(attempt.getErrorMessage());
}
}
}
}
}
- failures.clear();
- String message =
+ String errorMessage =
String.format(
"Some errors occurred writing to Kinesis. First %d errors: %s",
i, logEntry.toString());
- throw new IOException(message);
- }
-
- private class UserRecordResultFutureCallback implements FutureCallback<UserRecordResult> {
-
- @Override
- public void onFailure(Throwable cause) {
- failures.offer(new KinesisWriteException(cause));
- }
-
- @Override
- public void onSuccess(UserRecordResult result) {
- if (!result.isSuccessful()) {
- failures.offer(
- new KinesisWriteException(
- "Put record was not successful.", new UserRecordFailedException(result)));
- }
- }
+ throw new IOException(errorMessage);
}
}
}
@@ -772,9 +762,5 @@ public final class KinesisIO {
KinesisWriteException(String message, Throwable cause) {
super(message, cause);
}
-
- KinesisWriteException(Throwable cause) {
- super(cause);
- }
}
}
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
index 61140c9..78b72c7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
@@ -135,17 +135,15 @@ public class KinesisMockWriteTest {
Properties properties = new Properties();
properties.setProperty("KinesisPort", "qwe");
- Iterable<byte[]> data = ImmutableList.of("1".getBytes(StandardCharsets.UTF_8));
- p.apply(Create.of(data))
- .apply(
- KinesisIO.write()
- .withStreamName(STREAM)
- .withPartitionKey(PARTITION_KEY)
- .withAWSClientsProvider(new FakeKinesisProvider())
- .withProducerProperties(properties));
+ KinesisIO.Write write =
+ KinesisIO.write()
+ .withStreamName(STREAM)
+ .withPartitionKey(PARTITION_KEY)
+ .withAWSClientsProvider(new FakeKinesisProvider())
+ .withProducerProperties(properties);
- thrown.expect(RuntimeException.class);
- p.run().waitUntilFinish();
+ thrown.expect(IllegalArgumentException.class);
+ write.expand(null);
}
@Test
@@ -183,7 +181,7 @@ public class KinesisMockWriteTest {
.withStreamName(STREAM)
.withPartitionKey(PARTITION_KEY)
.withAWSClientsProvider(new FakeKinesisProvider().setFailedFlush(true))
- .withRetries(1));
+ .withRetries(2));
thrown.expect(RuntimeException.class);
p.run().waitUntilFinish();
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
index f9f03ce..215beec 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
@@ -26,8 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.DateTime;
/** Simple mock implementation of {@link IKinesisProducer} for testing. */
@@ -35,31 +37,39 @@ public class KinesisProducerMock implements IKinesisProducer {
private boolean isFailedFlush = false;
- private List<UserRecord> addedRecords = new ArrayList<>();
+ private List<UserRecord> addedRecords = Collections.synchronizedList(new ArrayList<>());
private KinesisServiceMock kinesisService = KinesisServiceMock.getInstance();
+ private AtomicInteger seqNumber = new AtomicInteger(0);
+
public KinesisProducerMock() {}
public KinesisProducerMock(KinesisProducerConfiguration config, boolean isFailedFlush) {
this.isFailedFlush = isFailedFlush;
+ this.seqNumber.set(0);
}
@Override
public ListenableFuture<UserRecordResult> addUserRecord(
String stream, String partitionKey, ByteBuffer data) {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
- public ListenableFuture<UserRecordResult> addUserRecord(
+ public synchronized ListenableFuture<UserRecordResult> addUserRecord(
String stream, String partitionKey, String explicitHashKey, ByteBuffer data) {
+ seqNumber.incrementAndGet();
SettableFuture<UserRecordResult> f = SettableFuture.create();
+ f.set(
+ new UserRecordResult(
+ new ArrayList<>(), String.valueOf(seqNumber.get()), explicitHashKey, !isFailedFlush));
+
if (kinesisService.getExistedStream().equals(stream)) {
addedRecords.add(new UserRecord(stream, partitionKey, explicitHashKey, data));
}
@@ -74,24 +84,24 @@ public class KinesisProducerMock implements IKinesisProducer {
@Override
public List<Metric> getMetrics(String metricName, int windowSeconds)
throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
public List<Metric> getMetrics(String metricName)
throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
public List<Metric> getMetrics() throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
public List<Metric> getMetrics(int windowSeconds)
throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
@@ -99,16 +109,11 @@ public class KinesisProducerMock implements IKinesisProducer {
@Override
public void flush(String stream) {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
- public void flush() {
- if (isFailedFlush) {
- // don't flush
- return;
- }
-
+ public synchronized void flush() {
DateTime arrival = DateTime.now();
for (int i = 0; i < addedRecords.size(); i++) {
UserRecord record = addedRecords.get(i);
@@ -120,8 +125,6 @@ public class KinesisProducerMock implements IKinesisProducer {
@Override
public synchronized void flushSync() {
- if (getOutstandingRecordsCount() > 0) {
- flush();
- }
+ throw new UnsupportedOperationException("Not implemented");
}
}