You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/31 05:03:08 UTC
[1/2] beam git commit: Fix HDFSFileSource’s split size estimate
Repository: beam
Updated Branches:
refs/heads/master bcc2806c0 -> 4e425ca1b
Fix HDFSFileSource\u2019s split size estimate
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/086e1674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/086e1674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/086e1674
Branch: refs/heads/master
Commit: 086e1674641be37aafd1235f2ac2db2da012376b
Parents: bcc2806
Author: Igor Bernstein <ig...@google.com>
Authored: Sun Jan 29 00:00:02 2017 -0500
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 30 21:02:55 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 10 +++++++++
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 23 ++++++++++++++++++++
2 files changed, 33 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 61660de..1affb4a 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -237,7 +237,14 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
@Override
public long getEstimatedSizeBytes(PipelineOptions options) {
long size = 0;
+
try {
+ // If this source represents a split from splitIntoBundles, then return the size of the split,
+ // rather then the entire input
+ if (serializableSplit != null) {
+ return serializableSplit.getSplit().getLength();
+ }
+
Job job = Job.getInstance(); // new instance
for (FileStatus st : listStatus(createFormat(job), job)) {
size += st.getLen();
@@ -245,6 +252,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
} catch (IOException | NoSuchMethodException | InvocationTargetException
| IllegalAccessException | InstantiationException e) {
// ignore, and return 0
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // ignore, and return 0
}
return size;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/086e1674/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 6145952..4c3f1ce 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -151,6 +151,29 @@ public class HDFSFileSourceTest {
assertTrue(nonEmptySplits > 2);
}
+ @Test
+ public void testSplitEstimatedSize() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+ File file = createFileWithData("tmp.avro", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ long originalSize = source.getEstimatedSizeBytes(options);
+ long splitTotalSize = 0;
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles(
+ SequenceFile.SYNC_INTERVAL, options
+ );
+ for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
+ splitTotalSize += splitSource.getEstimatedSizeBytes(options);
+ }
+ // Assert that the estimated size of the whole is the sum of its parts
+ assertEquals(originalSize, splitTotalSize);
+ }
+
private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
throws IOException {
File tmpFile = tmpFolder.newFile(filename);
[2/2] beam git commit: This closes #1867
Posted by dh...@apache.org.
This closes #1867
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e425ca1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e425ca1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e425ca1
Branch: refs/heads/master
Commit: 4e425ca1ba27a6051d6104d06d5f460b36b14afe
Parents: bcc2806 086e167
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 30 21:02:59 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 30 21:02:59 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 10 +++++++++
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 23 ++++++++++++++++++++
2 files changed, 33 insertions(+)
----------------------------------------------------------------------