You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/06/28 23:07:10 UTC

[beam] 01/03: [BEAM-7589] Use only one KinesisProducer instance per JVM

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 43bb514b6a400a71f2f3ca2e7f2ddc7cdfd8e459
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Thu Jun 27 18:45:36 2019 +0200

    [BEAM-7589] Use only one KinesisProducer instance per JVM
---
 .../org/apache/beam/sdk/io/kinesis/KinesisIO.java  | 144 ++++++++++-----------
 .../beam/sdk/io/kinesis/KinesisMockWriteTest.java  |  20 ++-
 .../beam/sdk/io/kinesis/KinesisProducerMock.java   |  39 +++---
 3 files changed, 95 insertions(+), 108 deletions(-)

diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 7a5cb52..6e1d951 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -29,13 +29,15 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.auto.value.AutoValue;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -587,20 +589,35 @@ public final class KinesisIO {
 
     private static class KinesisWriterFn extends DoFn<byte[], Void> {
 
-      private static final int MAX_NUM_RECORDS = 100 * 1000;
       private static final int MAX_NUM_FAILURES = 10;
 
       private final KinesisIO.Write spec;
-      private transient IKinesisProducer producer;
+      private static transient IKinesisProducer producer;
       private transient KinesisPartitioner partitioner;
       private transient LinkedBlockingDeque<KinesisWriteException> failures;
+      private transient List<Future<UserRecordResult>> putFutures;
 
-      public KinesisWriterFn(KinesisIO.Write spec) {
+      KinesisWriterFn(KinesisIO.Write spec) {
         this.spec = spec;
+        initKinesisProducer();
       }
 
       @Setup
-      public void setup() throws Exception {
+      public void setup() {
+        // Use custom partitioner if it exists
+        if (spec.getPartitioner() != null) {
+          partitioner = spec.getPartitioner();
+        }
+      }
+
+      @StartBundle
+      public void startBundle() {
+        putFutures = Collections.synchronizedList(new ArrayList<>());
+        /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
+        failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
+      }
+
+      private synchronized void initKinesisProducer() {
         // Init producer config
         Properties props = spec.getProducerProperties();
         if (props == null) {
@@ -614,13 +631,6 @@ public final class KinesisIO {
 
         // Init Kinesis producer
         producer = spec.getAWSClientsProvider().createKinesisProducer(config);
-        // Use custom partitioner if it exists
-        if (spec.getPartitioner() != null) {
-          partitioner = spec.getPartitioner();
-        }
-
-        /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
-        failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
       }
 
       /**
@@ -638,13 +648,7 @@ public final class KinesisIO {
        * the KPL</a>
        */
       @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        checkForFailures();
-
-        // Need to avoid keeping too many futures in producer's map to prevent OOM.
-        // In usual case, it should exit immediately.
-        flush(MAX_NUM_RECORDS);
-
+      public void processElement(ProcessContext c) {
         ByteBuffer data = ByteBuffer.wrap(c.element());
         String partitionKey = spec.getPartitionKey();
         String explicitHashKey = null;
@@ -657,73 +661,77 @@ public final class KinesisIO {
 
         ListenableFuture<UserRecordResult> f =
             producer.addUserRecord(spec.getStreamName(), partitionKey, explicitHashKey, data);
-        Futures.addCallback(f, new UserRecordResultFutureCallback());
+        putFutures.add(f);
       }
 
       @FinishBundle
       public void finishBundle() throws Exception {
-        // Flush all outstanding records, blocking call
-        flushAll();
-
-        checkForFailures();
-      }
-
-      @Teardown
-      public void tearDown() throws Exception {
-        if (producer != null) {
-          producer.destroy();
-          producer = null;
-        }
+        flushBundle();
       }
 
       /**
-       * Flush outstanding records until the total number will be less than required or the number
-       * of retries will be exhausted. The retry timeout starts from 1 second and it doubles on
-       * every iteration.
+       * Flush outstanding records until the total number of failed records will be less than 0 or
+       * the number of retries will be exhausted. The retry timeout starts from 1 second and it
+       * doubles on every iteration.
        */
-      private void flush(int numMax) throws InterruptedException, IOException {
+      private void flushBundle() throws InterruptedException, ExecutionException, IOException {
         int retries = spec.getRetries();
-        int numOutstandingRecords = producer.getOutstandingRecordsCount();
+        int numFailedRecords;
         int retryTimeout = 1000; // initial timeout, 1 sec
+        String message = "";
 
-        while (numOutstandingRecords > numMax && retries-- > 0) {
+        do {
+          numFailedRecords = 0;
           producer.flush();
+
+          // Wait for puts to finish and check the results
+          for (Future<UserRecordResult> f : putFutures) {
+            UserRecordResult result = f.get(); // this does block
+            if (!result.isSuccessful()) {
+              numFailedRecords++;
+            }
+          }
+
           // wait until outstanding records will be flushed
           Thread.sleep(retryTimeout);
-          numOutstandingRecords = producer.getOutstandingRecordsCount();
           retryTimeout *= 2; // exponential backoff
-        }
+        } while (numFailedRecords > 0 && retries-- > 0);
+
+        if (numFailedRecords > 0) {
+          for (Future<UserRecordResult> f : putFutures) {
+            UserRecordResult result = f.get();
+            if (!result.isSuccessful()) {
+              failures.offer(
+                  new KinesisWriteException(
+                      "Put record was not successful.", new UserRecordFailedException(result)));
+            }
+          }
 
-        if (numOutstandingRecords > numMax) {
-          String message =
+          message =
               String.format(
-                  "After [%d] retries, number of outstanding records [%d] is still greater than "
-                      + "required [%d].",
-                  spec.getRetries(), numOutstandingRecords, numMax);
+                  "After [%d] retries, number of failed records [%d] is still greater than 0",
+                  spec.getRetries(), numFailedRecords);
           LOG.error(message);
-          throw new IOException(message);
         }
-      }
 
-      private void flushAll() throws InterruptedException, IOException {
-        flush(0);
+        checkForFailures(message);
       }
 
       /** If any write has asynchronously failed, fail the bundle with a useful error. */
-      private void checkForFailures() throws IOException {
-        // Note that this function is never called by multiple threads and is the only place that
-        // we remove from failures, so this code is safe.
+      private void checkForFailures(String message) throws IOException {
         if (failures.isEmpty()) {
           return;
         }
 
         StringBuilder logEntry = new StringBuilder();
+        logEntry.append(message).append(System.lineSeparator());
+
         int i = 0;
         while (!failures.isEmpty()) {
           i++;
           KinesisWriteException exc = failures.remove();
 
-          logEntry.append("\n").append(exc.getMessage());
+          logEntry.append(System.lineSeparator()).append(exc.getMessage());
           Throwable cause = exc.getCause();
           if (cause != null) {
             logEntry.append(": ").append(cause.getMessage());
@@ -733,36 +741,18 @@ public final class KinesisIO {
                   ((UserRecordFailedException) cause).getResult().getAttempts();
               for (Attempt attempt : attempts) {
                 if (attempt.getErrorMessage() != null) {
-                  logEntry.append("\n").append(attempt.getErrorMessage());
+                  logEntry.append(System.lineSeparator()).append(attempt.getErrorMessage());
                 }
               }
             }
           }
         }
-        failures.clear();
 
-        String message =
+        String errorMessage =
             String.format(
                 "Some errors occurred writing to Kinesis. First %d errors: %s",
                 i, logEntry.toString());
-        throw new IOException(message);
-      }
-
-      private class UserRecordResultFutureCallback implements FutureCallback<UserRecordResult> {
-
-        @Override
-        public void onFailure(Throwable cause) {
-          failures.offer(new KinesisWriteException(cause));
-        }
-
-        @Override
-        public void onSuccess(UserRecordResult result) {
-          if (!result.isSuccessful()) {
-            failures.offer(
-                new KinesisWriteException(
-                    "Put record was not successful.", new UserRecordFailedException(result)));
-          }
-        }
+        throw new IOException(errorMessage);
       }
     }
   }
@@ -772,9 +762,5 @@ public final class KinesisIO {
     KinesisWriteException(String message, Throwable cause) {
       super(message, cause);
     }
-
-    KinesisWriteException(Throwable cause) {
-      super(cause);
-    }
   }
 }
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
index 61140c9..78b72c7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
@@ -135,17 +135,15 @@ public class KinesisMockWriteTest {
     Properties properties = new Properties();
     properties.setProperty("KinesisPort", "qwe");
 
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes(StandardCharsets.UTF_8));
-    p.apply(Create.of(data))
-        .apply(
-            KinesisIO.write()
-                .withStreamName(STREAM)
-                .withPartitionKey(PARTITION_KEY)
-                .withAWSClientsProvider(new FakeKinesisProvider())
-                .withProducerProperties(properties));
+    KinesisIO.Write write =
+        KinesisIO.write()
+            .withStreamName(STREAM)
+            .withPartitionKey(PARTITION_KEY)
+            .withAWSClientsProvider(new FakeKinesisProvider())
+            .withProducerProperties(properties);
 
-    thrown.expect(RuntimeException.class);
-    p.run().waitUntilFinish();
+    thrown.expect(IllegalArgumentException.class);
+    write.expand(null);
   }
 
   @Test
@@ -183,7 +181,7 @@ public class KinesisMockWriteTest {
                 .withStreamName(STREAM)
                 .withPartitionKey(PARTITION_KEY)
                 .withAWSClientsProvider(new FakeKinesisProvider().setFailedFlush(true))
-                .withRetries(1));
+                .withRetries(2));
 
     thrown.expect(RuntimeException.class);
     p.run().waitUntilFinish();
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
index f9f03ce..215beec 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
@@ -26,8 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.joda.time.DateTime;
 
 /** Simple mock implementation of {@link IKinesisProducer} for testing. */
@@ -35,31 +37,39 @@ public class KinesisProducerMock implements IKinesisProducer {
 
   private boolean isFailedFlush = false;
 
-  private List<UserRecord> addedRecords = new ArrayList<>();
+  private List<UserRecord> addedRecords = Collections.synchronizedList(new ArrayList<>());
 
   private KinesisServiceMock kinesisService = KinesisServiceMock.getInstance();
 
+  private AtomicInteger seqNumber = new AtomicInteger(0);
+
   public KinesisProducerMock() {}
 
   public KinesisProducerMock(KinesisProducerConfiguration config, boolean isFailedFlush) {
     this.isFailedFlush = isFailedFlush;
+    this.seqNumber.set(0);
   }
 
   @Override
   public ListenableFuture<UserRecordResult> addUserRecord(
       String stream, String partitionKey, ByteBuffer data) {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
   public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
-  public ListenableFuture<UserRecordResult> addUserRecord(
+  public synchronized ListenableFuture<UserRecordResult> addUserRecord(
       String stream, String partitionKey, String explicitHashKey, ByteBuffer data) {
+    seqNumber.incrementAndGet();
     SettableFuture<UserRecordResult> f = SettableFuture.create();
+    f.set(
+        new UserRecordResult(
+            new ArrayList<>(), String.valueOf(seqNumber.get()), explicitHashKey, !isFailedFlush));
+
     if (kinesisService.getExistedStream().equals(stream)) {
       addedRecords.add(new UserRecord(stream, partitionKey, explicitHashKey, data));
     }
@@ -74,24 +84,24 @@ public class KinesisProducerMock implements IKinesisProducer {
   @Override
   public List<Metric> getMetrics(String metricName, int windowSeconds)
       throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
   public List<Metric> getMetrics(String metricName)
       throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
   public List<Metric> getMetrics() throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
   public List<Metric> getMetrics(int windowSeconds)
       throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
@@ -99,16 +109,11 @@ public class KinesisProducerMock implements IKinesisProducer {
 
   @Override
   public void flush(String stream) {
-    throw new RuntimeException("Not implemented");
+    throw new UnsupportedOperationException("Not implemented");
   }
 
   @Override
-  public void flush() {
-    if (isFailedFlush) {
-      // don't flush
-      return;
-    }
-
+  public synchronized void flush() {
     DateTime arrival = DateTime.now();
     for (int i = 0; i < addedRecords.size(); i++) {
       UserRecord record = addedRecords.get(i);
@@ -120,8 +125,6 @@ public class KinesisProducerMock implements IKinesisProducer {
 
   @Override
   public synchronized void flushSync() {
-    if (getOutstandingRecordsCount() > 0) {
-      flush();
-    }
+    throw new UnsupportedOperationException("Not implemented");
   }
 }