You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/23 01:07:39 UTC
[1/2] incubator-beam git commit: Move allowsDynamicSplitting to
Reader, and set it in CompressedSource.
Repository: incubator-beam
Updated Branches:
refs/heads/master 8949ec315 -> e0cae9fb6
Move allowsDynamicSplitting to Reader, and set it in CompressedSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7e9a7e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7e9a7e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7e9a7e8
Branch: refs/heads/master
Commit: b7e9a7e8fa467641dfe1c19fa5d5f63f4b74a6d0
Parents: 8949ec3
Author: Pei He <pe...@google.com>
Authored: Fri Jun 17 18:24:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 22 18:07:16 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 5 +++
.../apache/beam/sdk/io/OffsetBasedSource.java | 26 +++++++--------
.../beam/sdk/io/CompressedSourceTest.java | 35 ++++++++++++++++++++
3 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index a5c54b3..75bfc8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -396,6 +396,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
}
@Override
+ public boolean allowsDynamicSplitting() {
+ return splittable;
+ }
+
+ @Override
public final long getSplitPointsConsumed() {
if (splittable) {
return readerDelegate.getSplitPointsConsumed();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 2f62acd..295eab9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -192,17 +192,6 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
*/
public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
- /**
- * Whether this source should allow dynamic splitting of the offset ranges.
- *
- * <p>True by default. Override this to return false if the source cannot
- * support dynamic splitting correctly. If this returns false,
- * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests.
- */
- public boolean allowsDynamicSplitting() {
- return true;
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
@@ -342,7 +331,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
// Note that even if the current source does not allow splitting, we don't know that
// it's non-empty so we return UNKNOWN instead of 1.
return BoundedReader.SPLIT_POINTS_UNKNOWN;
- } else if (!getCurrentSource().allowsDynamicSplitting()) {
+ } else if (!allowsDynamicSplitting()) {
// Started (so non-empty) and unsplittable, so only the current task.
return 1;
} else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {
@@ -355,9 +344,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
}
}
+ /**
+ * Whether this reader should allow dynamic splitting of the offset ranges.
+ *
+ * <p>True by default. Override this to return false if the reader cannot
+ * support dynamic splitting correctly. If this returns false,
+ * {@link OffsetBasedReader#splitAtFraction} will refuse all split requests.
+ */
+ public boolean allowsDynamicSplitting() {
+ return true;
+ }
+
@Override
public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
- if (!getCurrentSource().allowsDynamicSplitting()) {
+ if (!allowsDynamicSplitting()) {
return null;
}
if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index abf1de3..8fbed94 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -24,6 +24,8 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -44,6 +46,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.primitives.Bytes;
@@ -633,6 +637,37 @@ public class CompressedSourceTest {
}
@Test
+ public void testUnsplittable() throws IOException {
+ String baseName = "test-input";
+ File compressedFile = tmpFolder.newFile(baseName + ".gz");
+ byte[] input = generateInput(10000);
+ writeFile(compressedFile, input, CompressionMode.GZIP);
+
+ CompressedSource<Byte> source =
+ CompressedSource.from(new ByteSource(compressedFile.getPath(), 1));
+ List<Byte> expected = Lists.newArrayList();
+ for (byte i : input) {
+ expected.add(i);
+ }
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedReader<Byte> reader = source.createReader(options);
+
+ List<Byte> actual = Lists.newArrayList();
+ for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
+ actual.add(reader.getCurrent());
+ // checkpoint every 9 elements
+ if (actual.size() % 9 == 0) {
+ Double fractionConsumed = reader.getFractionConsumed();
+ assertNotNull(fractionConsumed);
+ assertNull(reader.splitAtFraction(fractionConsumed));
+ }
+ }
+ assertEquals(expected.size(), actual.size());
+ assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual));
+ }
+
+ @Test
public void testSplittableProgress() throws IOException {
File tmpFile = tmpFolder.newFile("nonempty.txt");
String filename = tmpFile.toPath().toString();
[2/2] incubator-beam git commit: Closes #502
Posted by dh...@apache.org.
Closes #502
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e0cae9fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e0cae9fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e0cae9fb
Branch: refs/heads/master
Commit: e0cae9fb691c53f5aeb1c43c52ff4354d4680f2e
Parents: 8949ec3 b7e9a7e
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 22 18:07:17 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 22 18:07:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 5 +++
.../apache/beam/sdk/io/OffsetBasedSource.java | 26 +++++++--------
.../beam/sdk/io/CompressedSourceTest.java | 35 ++++++++++++++++++++
3 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------