You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/04/21 22:08:10 UTC

[beam] branch release-2.21.0 updated: Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 05c92c1  Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."
     new f48165e  Merge pull request #11486 from lukecwik/master
05c92c1 is described below

commit 05c92c129059d36c8f48e292cd44db3838a2c6d6
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Tue Apr 21 13:06:17 2020 -0700

    Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."
---
 .../worker/ApplianceShuffleEntryReader.java        |  6 +--
 .../worker/ChunkingShuffleBatchReader.java         |  8 +--
 .../common/worker/CachingShuffleBatchReader.java   | 61 ++++------------------
 .../util/common/worker/ShuffleBatchReader.java     |  5 +-
 .../worker/BatchingShuffleEntryReaderTest.java     | 18 +++----
 .../worker/CachingShuffleBatchReaderTest.java      |  4 +-
 6 files changed, 26 insertions(+), 76 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
index d184364..71228c5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
@@ -44,9 +44,9 @@ public class ApplianceShuffleEntryReader implements ShuffleEntryReader {
         new ChunkingShuffleBatchReader(executionContext, operationContext, applianceShuffleReader);
 
     if (cache) {
-      // Limit the size of the cache to ~32 full shuffle batches.
-      final long maxBytes = 128L * 1024 * 1024;
-      batchReader = new CachingShuffleBatchReader(batchReader, maxBytes);
+      // Limit the size of the cache.
+      final int maxBatches = 32;
+      batchReader = new CachingShuffleBatchReader(batchReader, maxBatches);
     }
     entryReader = new BatchingShuffleEntryReader(batchReader);
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
index 8939fa7..7f25c81 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
@@ -59,18 +59,14 @@ final class ChunkingShuffleBatchReader implements ShuffleBatchReader {
     }
     DataInputStream input = new DataInputStream(new ByteArrayInputStream(result.chunk));
     ArrayList<ShuffleEntry> entries = new ArrayList<>();
-    long batchSize = 0;
     while (input.available() > 0) {
-      ShuffleEntry entry = getShuffleEntry(input);
-      batchSize += entry.length();
-      entries.add(entry);
+      entries.add(getShuffleEntry(input));
     }
     return new Batch(
         entries,
         result.nextStartPosition == null
             ? null
-            : ByteArrayShufflePosition.of(result.nextStartPosition),
-        batchSize);
+            : ByteArrayShufflePosition.of(result.nextStartPosition));
   }
 
   /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
index c769881..fc87bc4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
@@ -27,41 +27,26 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Weigher;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
 
 /** A {@link ShuffleBatchReader} that caches batches as they're read. */
 public class CachingShuffleBatchReader implements ShuffleBatchReader {
   private final ShuffleBatchReader reader;
   @VisibleForTesting final LoadingCache<BatchRange, Batch> cache;
 
-  /**
-   * Limit the size of the cache to 1GiB of batches.
-   *
-   * <p>If this increases beyond Integer.MAX_VALUE then {@link BatchWeigher} must be updated.
-   * Because a batch may be larger than 1GiB, the actual in-memory batch size may exceed this value.
-   */
-  private static final int MAXIMUM_WEIGHT = 1024 * 1024 * 1024;
+  /** Limit the size of the cache to 1000 batches. */
+  private static final int MAXIMUM_BATCHES = 1000;
 
   // Ensure that batches in the cache are expired quickly
   // for improved GC performance.
-  private static final Duration EXPIRE_AFTER = Duration.ofMillis(250);
+  private static final long EXPIRE_AFTER_MS = 250;
 
-  /**
-   * Creates the caching reader.
-   *
-   * @param shuffleReader wrapped reader.
-   * @param maximumWeightBytes maximum bytes for the cache.
-   * @param expireAfterAccess cache items may be evicted after the elapsed duration.
-   */
   public CachingShuffleBatchReader(
-      ShuffleBatchReader shuffleReader, long maximumWeightBytes, Duration expireAfterAccess) {
+      ShuffleBatchReader shuffleReader, int maximumBatches, long expireAfterAccessMillis) {
     this.reader = shuffleReader;
     this.cache =
         CacheBuilder.newBuilder()
-            .maximumWeight(maximumWeightBytes)
-            .weigher(new BatchWeigher())
-            .expireAfterAccess(expireAfterAccess)
+            .maximumSize(maximumBatches)
+            .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
             .<BatchRange, Batch>build(
                 new CacheLoader<BatchRange, Batch>() {
                   @Override
@@ -73,24 +58,12 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
                 });
   }
 
-  /**
-   * Creates the caching reader with a maximum size of {@link MAXIMUM_WEIGHT} and an element expiry
-   * duration of {@link EXPIRE_AFTER}.
-   *
-   * @param shuffleReader wrapped reader.
-   */
   public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader) {
-    this(shuffleReader, MAXIMUM_WEIGHT, EXPIRE_AFTER);
+    this(shuffleReader, MAXIMUM_BATCHES, EXPIRE_AFTER_MS);
   }
 
-  /**
-   * Creates the caching reader with an element expiry duration of {@link EXPIRE_AFTER}.
-   *
-   * @param shuffleReader wrapped reader.
-   * @param maximumWeightBytes maximum bytes for the cache.
-   */
-  public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, long maximumWeightBytes) {
-    this(shuffleReader, maximumWeightBytes, EXPIRE_AFTER);
+  public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, int maximumBatches) {
+    this(shuffleReader, maximumBatches, EXPIRE_AFTER_MS);
   }
 
   @Override
@@ -129,18 +102,4 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
       return Objects.hashCode(startPosition, endPosition);
     }
   }
-
-  /**
-   * Returns the weight of a Batch, in bytes, within the range [0, Integer.MAX_VALUE].
-   *
-   * <p>The cache holds {@link MAX_WEIGHT} bytes. If {@link MAX_WEIGHT} is increased beyond
-   * Integer.MAX_VALUE bytes, a new weighing heuristic will be required to avoid under representing
-   * the number of bytes in memory.
-   */
-  static final class BatchWeigher implements Weigher<BatchRange, Batch> {
-    @Override
-    public int weigh(BatchRange key, Batch value) {
-      return Ints.saturatedCast(value.bytes);
-    }
-  }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
index 29890c8..d1676eb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
@@ -30,13 +30,10 @@ public interface ShuffleBatchReader {
   public static class Batch {
     public final List<ShuffleEntry> entries;
     @Nullable public final ShufflePosition nextStartPosition;
-    public final long bytes;
 
-    public Batch(
-        List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition, long bytes) {
+    public Batch(List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition) {
       this.entries = entries;
       this.nextStartPosition = nextStartPosition;
-      this.bytes = bytes;
     }
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
index faa4b05..9ee670b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import static com.google.api.client.util.Lists.newArrayList;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
@@ -68,9 +68,8 @@ public final class BatchingShuffleEntryReaderTest {
     ArrayList<ShuffleEntry> entries = new ArrayList<>();
     entries.add(e1);
     entries.add(e2);
-    long batchSize = (long) e1.length() + e2.length();
     when(batchReader.read(START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
+        .thenReturn(new ShuffleBatchReader.Batch(entries, null));
     List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
     assertThat(results, contains(e1, e2));
   }
@@ -82,9 +81,8 @@ public final class BatchingShuffleEntryReaderTest {
     ArrayList<ShuffleEntry> entries = new ArrayList<>();
     entries.add(e1);
     entries.add(e2);
-    long batchSize = (long) e1.length() + e2.length();
     when(batchReader.read(START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
+        .thenReturn(new ShuffleBatchReader.Batch(entries, null));
     Reiterator<ShuffleEntry> it = reader.read(START_POSITION, END_POSITION);
     assertThat(it.hasNext(), equalTo(Boolean.TRUE));
     assertThat(it.next(), equalTo(e1));
@@ -104,9 +102,9 @@ public final class BatchingShuffleEntryReaderTest {
     ShuffleEntry e2 = new ShuffleEntry(KEY, SKEY, VALUE);
     List<ShuffleEntry> e2s = Collections.singletonList(e2);
     when(batchReader.read(START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, e1.length()));
+        .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
     when(batchReader.read(NEXT_START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(e2s, null, e2.length()));
+        .thenReturn(new ShuffleBatchReader.Batch(e2s, null));
     List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
     assertThat(results, contains(e1, e2));
 
@@ -122,11 +120,11 @@ public final class BatchingShuffleEntryReaderTest {
     ShuffleEntry e3 = new ShuffleEntry(KEY, SKEY, VALUE);
     List<ShuffleEntry> e3s = Collections.singletonList(e3);
     when(batchReader.read(START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, 0));
+        .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
     when(batchReader.read(NEXT_START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION, 0));
+        .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION));
     when(batchReader.read(SECOND_NEXT_START_POSITION, END_POSITION))
-        .thenReturn(new ShuffleBatchReader.Batch(e3s, null, e3.length()));
+        .thenReturn(new ShuffleBatchReader.Batch(e3s, null));
     List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
     assertThat(results, contains(e3));
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
index 27dbc1d..88db3b2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -37,7 +37,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public final class CachingShuffleBatchReaderTest {
   private final ShuffleBatchReader.Batch testBatch =
-      new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null, 0);
+      new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null);
 
   @Test
   public void readerShouldCacheReads() throws IOException {