You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/25 07:23:17 UTC

[1/2] beam git commit: [BEAM-1531] Add dynamic work rebalancing support for HBaseIO

Repository: beam
Updated Branches:
  refs/heads/master 20d88dbfc -> f4b66f3a0


[BEAM-1531] Add dynamic work rebalancing support for HBaseIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e951baab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e951baab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e951baab

Branch: refs/heads/master
Commit: e951baab29100236983407b5107fbf5c4a49908e
Parents: 20d88db
Author: Ismaël Mejía <ie...@apache.org>
Authored: Mon May 22 07:51:13 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 09:22:45 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 81 +++++++++++++++++++-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 56 +++++++++++++-
 2 files changed, 132 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e951baab/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 2ba6826..7f58cef 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -303,6 +304,22 @@ public class HBaseIO {
             this.estimatedSizeBytes = estimatedSizeBytes;
         }
 
+        HBaseSource withStartKey(ByteKey startKey) throws IOException {
+            checkNotNull(startKey, "startKey");
+            Read newRead = new Read(read.serializableConfiguration, read.tableId,
+                new SerializableScan(
+                    new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
+            return new HBaseSource(newRead, estimatedSizeBytes);
+        }
+
+        HBaseSource withEndKey(ByteKey endKey) throws IOException {
+            checkNotNull(endKey, "endKey");
+            Read newRead = new Read(read.serializableConfiguration, read.tableId,
+                new SerializableScan(
+                    new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
+            return new HBaseSource(newRead, estimatedSizeBytes);
+        }
+
         @Override
         public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
             if (estimatedSizeBytes == null) {
@@ -463,19 +480,25 @@ public class HBaseIO {
     }
 
     private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
-        private final HBaseSource source;
+        private HBaseSource source;
         private Connection connection;
         private ResultScanner scanner;
         private Iterator<Result> iter;
         private Result current;
+        private final ByteKeyRangeTracker rangeTracker;
         private long recordsReturned;
 
         HBaseReader(HBaseSource source) {
             this.source = source;
+            Scan scan = source.read.serializableScan.get();
+            ByteKeyRange range = ByteKeyRange
+                .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
+            rangeTracker = ByteKeyRangeTracker.of(range);
         }
 
         @Override
         public boolean start() throws IOException {
+            HBaseSource source = getCurrentSource();
             Configuration configuration = source.read.serializableConfiguration.get();
             String tableId = source.read.tableId;
             connection = ConnectionFactory.createConnection(configuration);
@@ -495,9 +518,15 @@ public class HBaseIO {
 
         @Override
         public boolean advance() throws IOException {
-            boolean hasRecord = iter.hasNext();
+            if (!iter.hasNext()) {
+                return rangeTracker.markDone();
+            }
+            final Result next = iter.next();
+            boolean hasRecord =
+                rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
+                || rangeTracker.markDone();
             if (hasRecord) {
-                current = iter.next();
+                current = next;
                 ++recordsReturned;
             }
             return hasRecord;
@@ -517,9 +546,53 @@ public class HBaseIO {
         }
 
         @Override
-        public BoundedSource<Result> getCurrentSource() {
+        public synchronized HBaseSource getCurrentSource() {
             return source;
         }
+
+        @Override
+        public final Double getFractionConsumed() {
+            return rangeTracker.getFractionConsumed();
+        }
+
+        @Override
+        public final long getSplitPointsConsumed() {
+            return rangeTracker.getSplitPointsConsumed();
+        }
+
+        @Override
+        @Nullable
+        public final synchronized HBaseSource splitAtFraction(double fraction) {
+            ByteKey splitKey;
+            try {
+                splitKey = rangeTracker.getRange().interpolateKey(fraction);
+            } catch (RuntimeException e) {
+                LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(),
+                    fraction, e);
+                return null;
+            }
+            LOG.info(
+                "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+            HBaseSource primary;
+            HBaseSource residual;
+            try {
+                primary = source.withEndKey(splitKey);
+                residual = source.withStartKey(splitKey);
+            } catch (Exception e) {
+                LOG.info(
+                    "{}: Interpolating for fraction {} yielded invalid split key {}.",
+                    rangeTracker.getRange(),
+                    fraction,
+                    splitKey,
+                    e);
+                return null;
+            }
+            if (!rangeTracker.trySplitAtPosition(splitKey)) {
+                return null;
+            }
+            this.source = primary;
+            return residual;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e951baab/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 806a27f..0b7f203 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -18,6 +18,9 @@
 package org.apache.beam.sdk.io.hbase;
 
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
@@ -83,7 +86,7 @@ public class HBaseIOTest {
     private static HBaseTestingUtility htu;
     private static HBaseAdmin admin;
 
-    private static Configuration conf = HBaseConfiguration.create();
+    private static final Configuration conf = HBaseConfiguration.create();
     private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
     private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
     private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email");
@@ -277,6 +280,57 @@ public class HBaseIOTest {
                 .withKeyRange(startRow, stopRow), 441);
     }
 
+    /**
+     * Tests dynamic work rebalancing exhaustively.
+     */
+    @Test
+    public void testReadingSplitAtFractionExhaustive() throws Exception {
+        final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+        final int numRows = 7;
+
+        createTable(table);
+        writeData(table, numRows);
+
+        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
+        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */)
+            .withStartKey(ByteKey.of(48)).withEndKey(ByteKey.of(58));
+
+        assertSplitAtFractionExhaustive(source, null);
+    }
+
+    /**
+     * Unit tests of splitAtFraction.
+     */
+    @Test
+    public void testReadingSplitAtFraction() throws Exception {
+        final String table = "TEST-SPLIT-AT-FRACTION";
+        final int numRows = 10;
+
+        createTable(table);
+        writeData(table, numRows);
+
+        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
+        HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+
+        // The value k is based on the partitioning schema for the data, in this test case,
+        // the partitioning is HEX-based, so we start from 1/16m and the value k will be
+        // around 1/256, so the tests are done in approximately k ~= 0.003922 steps
+        double k = 0.003922;
+
+        assertSplitAtFractionFails(source, 0, k, null /* options */);
+        assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+        // With 1 items read, all split requests past k will succeed.
+        assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options */);
+        assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
+        // With 3 items read, all split requests past 3k will succeed.
+        assertSplitAtFractionFails(source, 3, 2 * k, null /* options */);
+        assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* options */);
+        assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* options */);
+        // With 6 items read, all split requests past 6k will succeed.
+        assertSplitAtFractionFails(source, 6, 5 * k, null /* options */);
+        assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
+    }
+
     @Test
     public void testReadingDisplayData() {
         HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable");


[2/2] beam git commit: This closes #3754

Posted by ie...@apache.org.
This closes #3754


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4b66f3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4b66f3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4b66f3a

Branch: refs/heads/master
Commit: f4b66f3a09e6a2fe19dc49ec45cec5f5c4f08fde
Parents: 20d88db e951baa
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Fri Aug 25 09:22:52 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 09:22:52 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 81 +++++++++++++++++++-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 56 +++++++++++++-
 2 files changed, 132 insertions(+), 5 deletions(-)
----------------------------------------------------------------------