You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/04/21 22:08:10 UTC
[beam] branch release-2.21.0 updated: Revert "[BEAM-9014]
CachingShuffleBatchReader use bytes to limit cache size."
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new 05c92c1 Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."
new f48165e Merge pull request #11486 from lukecwik/master
05c92c1 is described below
commit 05c92c129059d36c8f48e292cd44db3838a2c6d6
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Tue Apr 21 13:06:17 2020 -0700
Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."
---
.../worker/ApplianceShuffleEntryReader.java | 6 +--
.../worker/ChunkingShuffleBatchReader.java | 8 +--
.../common/worker/CachingShuffleBatchReader.java | 61 ++++------------------
.../util/common/worker/ShuffleBatchReader.java | 5 +-
.../worker/BatchingShuffleEntryReaderTest.java | 18 +++----
.../worker/CachingShuffleBatchReaderTest.java | 4 +-
6 files changed, 26 insertions(+), 76 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
index d184364..71228c5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java
@@ -44,9 +44,9 @@ public class ApplianceShuffleEntryReader implements ShuffleEntryReader {
new ChunkingShuffleBatchReader(executionContext, operationContext, applianceShuffleReader);
if (cache) {
- // Limit the size of the cache to ~32 full shuffle batches.
- final long maxBytes = 128L * 1024 * 1024;
- batchReader = new CachingShuffleBatchReader(batchReader, maxBytes);
+ // Limit the size of the cache.
+ final int maxBatches = 32;
+ batchReader = new CachingShuffleBatchReader(batchReader, maxBatches);
}
entryReader = new BatchingShuffleEntryReader(batchReader);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
index 8939fa7..7f25c81 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
@@ -59,18 +59,14 @@ final class ChunkingShuffleBatchReader implements ShuffleBatchReader {
}
DataInputStream input = new DataInputStream(new ByteArrayInputStream(result.chunk));
ArrayList<ShuffleEntry> entries = new ArrayList<>();
- long batchSize = 0;
while (input.available() > 0) {
- ShuffleEntry entry = getShuffleEntry(input);
- batchSize += entry.length();
- entries.add(entry);
+ entries.add(getShuffleEntry(input));
}
return new Batch(
entries,
result.nextStartPosition == null
? null
- : ByteArrayShufflePosition.of(result.nextStartPosition),
- batchSize);
+ : ByteArrayShufflePosition.of(result.nextStartPosition));
}
/**
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
index c769881..fc87bc4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;
import java.io.IOException;
-import java.time.Duration;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
@@ -27,41 +27,26 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Weigher;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
/** A {@link ShuffleBatchReader} that caches batches as they're read. */
public class CachingShuffleBatchReader implements ShuffleBatchReader {
private final ShuffleBatchReader reader;
@VisibleForTesting final LoadingCache<BatchRange, Batch> cache;
- /**
- * Limit the size of the cache to 1GiB of batches.
- *
- * <p>If this increases beyond Integer.MAX_VALUE then {@link BatchWeigher} must be updated.
- * Because a batch may be larger than 1GiB, the actual in-memory batch size may exceed this value.
- */
- private static final int MAXIMUM_WEIGHT = 1024 * 1024 * 1024;
+ /** Limit the size of the cache to 1000 batches. */
+ private static final int MAXIMUM_BATCHES = 1000;
// Ensure that batches in the cache are expired quickly
// for improved GC performance.
- private static final Duration EXPIRE_AFTER = Duration.ofMillis(250);
+ private static final long EXPIRE_AFTER_MS = 250;
- /**
- * Creates the caching reader.
- *
- * @param shuffleReader wrapped reader.
- * @param maximumWeightBytes maximum bytes for the cache.
- * @param expireAfterAccess cache items may be evicted after the elapsed duration.
- */
public CachingShuffleBatchReader(
- ShuffleBatchReader shuffleReader, long maximumWeightBytes, Duration expireAfterAccess) {
+ ShuffleBatchReader shuffleReader, int maximumBatches, long expireAfterAccessMillis) {
this.reader = shuffleReader;
this.cache =
CacheBuilder.newBuilder()
- .maximumWeight(maximumWeightBytes)
- .weigher(new BatchWeigher())
- .expireAfterAccess(expireAfterAccess)
+ .maximumSize(maximumBatches)
+ .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
.<BatchRange, Batch>build(
new CacheLoader<BatchRange, Batch>() {
@Override
@@ -73,24 +58,12 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
});
}
- /**
- * Creates the caching reader with a maximum size of {@link MAXIMUM_WEIGHT} and an element expiry
- * duration of {@link EXPIRE_AFTER}.
- *
- * @param shuffleReader wrapped reader.
- */
public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader) {
- this(shuffleReader, MAXIMUM_WEIGHT, EXPIRE_AFTER);
+ this(shuffleReader, MAXIMUM_BATCHES, EXPIRE_AFTER_MS);
}
- /**
- * Creates the caching reader with an element expiry duration of {@link EXPIRE_AFTER}.
- *
- * @param shuffleReader wrapped reader.
- * @param maximumWeightBytes maximum bytes for the cache.
- */
- public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, long maximumWeightBytes) {
- this(shuffleReader, maximumWeightBytes, EXPIRE_AFTER);
+ public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, int maximumBatches) {
+ this(shuffleReader, maximumBatches, EXPIRE_AFTER_MS);
}
@Override
@@ -129,18 +102,4 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
return Objects.hashCode(startPosition, endPosition);
}
}
-
- /**
- * Returns the weight of a Batch, in bytes, within the range [0, Integer.MAX_VALUE].
- *
- * <p>The cache holds {@link MAX_WEIGHT} bytes. If {@link MAX_WEIGHT} is increased beyond
- * Integer.MAX_VALUE bytes, a new weighing heuristic will be required to avoid under representing
- * the number of bytes in memory.
- */
- static final class BatchWeigher implements Weigher<BatchRange, Batch> {
- @Override
- public int weigh(BatchRange key, Batch value) {
- return Ints.saturatedCast(value.bytes);
- }
- }
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
index 29890c8..d1676eb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java
@@ -30,13 +30,10 @@ public interface ShuffleBatchReader {
public static class Batch {
public final List<ShuffleEntry> entries;
@Nullable public final ShufflePosition nextStartPosition;
- public final long bytes;
- public Batch(
- List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition, long bytes) {
+ public Batch(List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition) {
this.entries = entries;
this.nextStartPosition = nextStartPosition;
- this.bytes = bytes;
}
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
index faa4b05..9ee670b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;
import static com.google.api.client.util.Lists.newArrayList;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -68,9 +68,8 @@ public final class BatchingShuffleEntryReaderTest {
ArrayList<ShuffleEntry> entries = new ArrayList<>();
entries.add(e1);
entries.add(e2);
- long batchSize = (long) e1.length() + e2.length();
when(batchReader.read(START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
+ .thenReturn(new ShuffleBatchReader.Batch(entries, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e1, e2));
}
@@ -82,9 +81,8 @@ public final class BatchingShuffleEntryReaderTest {
ArrayList<ShuffleEntry> entries = new ArrayList<>();
entries.add(e1);
entries.add(e2);
- long batchSize = (long) e1.length() + e2.length();
when(batchReader.read(START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
+ .thenReturn(new ShuffleBatchReader.Batch(entries, null));
Reiterator<ShuffleEntry> it = reader.read(START_POSITION, END_POSITION);
assertThat(it.hasNext(), equalTo(Boolean.TRUE));
assertThat(it.next(), equalTo(e1));
@@ -104,9 +102,9 @@ public final class BatchingShuffleEntryReaderTest {
ShuffleEntry e2 = new ShuffleEntry(KEY, SKEY, VALUE);
List<ShuffleEntry> e2s = Collections.singletonList(e2);
when(batchReader.read(START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, e1.length()));
+ .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
when(batchReader.read(NEXT_START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(e2s, null, e2.length()));
+ .thenReturn(new ShuffleBatchReader.Batch(e2s, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e1, e2));
@@ -122,11 +120,11 @@ public final class BatchingShuffleEntryReaderTest {
ShuffleEntry e3 = new ShuffleEntry(KEY, SKEY, VALUE);
List<ShuffleEntry> e3s = Collections.singletonList(e3);
when(batchReader.read(START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, 0));
+ .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
when(batchReader.read(NEXT_START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION, 0));
+ .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION));
when(batchReader.read(SECOND_NEXT_START_POSITION, END_POSITION))
- .thenReturn(new ShuffleBatchReader.Batch(e3s, null, e3.length()));
+ .thenReturn(new ShuffleBatchReader.Batch(e3s, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e3));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
index 27dbc1d..88db3b2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -37,7 +37,7 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public final class CachingShuffleBatchReaderTest {
private final ShuffleBatchReader.Batch testBatch =
- new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null, 0);
+ new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null);
@Test
public void readerShouldCacheReads() throws IOException {