You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/25 07:23:17 UTC
[1/2] beam git commit: [BEAM-1531] Add dynamic work rebalancing
support for HBaseIO
Repository: beam
Updated Branches:
refs/heads/master 20d88dbfc -> f4b66f3a0
[BEAM-1531] Add dynamic work rebalancing support for HBaseIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e951baab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e951baab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e951baab
Branch: refs/heads/master
Commit: e951baab29100236983407b5107fbf5c4a49908e
Parents: 20d88db
Author: Ismaël Mejía <ie...@apache.org>
Authored: Mon May 22 07:51:13 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 09:22:45 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 81 +++++++++++++++++++-
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 56 +++++++++++++-
2 files changed, 132 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e951baab/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 2ba6826..7f58cef 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -303,6 +304,22 @@ public class HBaseIO {
this.estimatedSizeBytes = estimatedSizeBytes;
}
+ HBaseSource withStartKey(ByteKey startKey) throws IOException {
+ checkNotNull(startKey, "startKey");
+ Read newRead = new Read(read.serializableConfiguration, read.tableId,
+ new SerializableScan(
+ new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
+ return new HBaseSource(newRead, estimatedSizeBytes);
+ }
+
+ HBaseSource withEndKey(ByteKey endKey) throws IOException {
+ checkNotNull(endKey, "endKey");
+ Read newRead = new Read(read.serializableConfiguration, read.tableId,
+ new SerializableScan(
+ new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
+ return new HBaseSource(newRead, estimatedSizeBytes);
+ }
+
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
if (estimatedSizeBytes == null) {
@@ -463,19 +480,25 @@ public class HBaseIO {
}
private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
- private final HBaseSource source;
+ private HBaseSource source;
private Connection connection;
private ResultScanner scanner;
private Iterator<Result> iter;
private Result current;
+ private final ByteKeyRangeTracker rangeTracker;
private long recordsReturned;
HBaseReader(HBaseSource source) {
this.source = source;
+ Scan scan = source.read.serializableScan.get();
+ ByteKeyRange range = ByteKeyRange
+ .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
+ rangeTracker = ByteKeyRangeTracker.of(range);
}
@Override
public boolean start() throws IOException {
+ HBaseSource source = getCurrentSource();
Configuration configuration = source.read.serializableConfiguration.get();
String tableId = source.read.tableId;
connection = ConnectionFactory.createConnection(configuration);
@@ -495,9 +518,15 @@ public class HBaseIO {
@Override
public boolean advance() throws IOException {
- boolean hasRecord = iter.hasNext();
+ if (!iter.hasNext()) {
+ return rangeTracker.markDone();
+ }
+ final Result next = iter.next();
+ boolean hasRecord =
+ rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
+ || rangeTracker.markDone();
if (hasRecord) {
- current = iter.next();
+ current = next;
++recordsReturned;
}
return hasRecord;
@@ -517,9 +546,53 @@ public class HBaseIO {
}
@Override
- public BoundedSource<Result> getCurrentSource() {
+ public synchronized HBaseSource getCurrentSource() {
return source;
}
+
+ @Override
+ public final Double getFractionConsumed() {
+ return rangeTracker.getFractionConsumed();
+ }
+
+ @Override
+ public final long getSplitPointsConsumed() {
+ return rangeTracker.getSplitPointsConsumed();
+ }
+
+ @Override
+ @Nullable
+ public final synchronized HBaseSource splitAtFraction(double fraction) {
+ ByteKey splitKey;
+ try {
+ splitKey = rangeTracker.getRange().interpolateKey(fraction);
+ } catch (RuntimeException e) {
+ LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(),
+ fraction, e);
+ return null;
+ }
+ LOG.info(
+ "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+ HBaseSource primary;
+ HBaseSource residual;
+ try {
+ primary = source.withEndKey(splitKey);
+ residual = source.withStartKey(splitKey);
+ } catch (Exception e) {
+ LOG.info(
+ "{}: Interpolating for fraction {} yielded invalid split key {}.",
+ rangeTracker.getRange(),
+ fraction,
+ splitKey,
+ e);
+ return null;
+ }
+ if (!rangeTracker.trySplitAtPosition(splitKey)) {
+ return null;
+ }
+ this.source = primary;
+ return residual;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/e951baab/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 806a27f..0b7f203 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -18,6 +18,9 @@
package org.apache.beam.sdk.io.hbase;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
@@ -83,7 +86,7 @@ public class HBaseIOTest {
private static HBaseTestingUtility htu;
private static HBaseAdmin admin;
- private static Configuration conf = HBaseConfiguration.create();
+ private static final Configuration conf = HBaseConfiguration.create();
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email");
@@ -277,6 +280,57 @@ public class HBaseIOTest {
.withKeyRange(startRow, stopRow), 441);
}
+ /**
+ * Tests dynamic work rebalancing exhaustively.
+ */
+ @Test
+ public void testReadingSplitAtFractionExhaustive() throws Exception {
+ final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+ final int numRows = 7;
+
+ createTable(table);
+ writeData(table, numRows);
+
+ HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
+ HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */)
+ .withStartKey(ByteKey.of(48)).withEndKey(ByteKey.of(58));
+
+ assertSplitAtFractionExhaustive(source, null);
+ }
+
+ /**
+ * Unit tests of splitAtFraction.
+ */
+ @Test
+ public void testReadingSplitAtFraction() throws Exception {
+ final String table = "TEST-SPLIT-AT-FRACTION";
+ final int numRows = 10;
+
+ createTable(table);
+ writeData(table, numRows);
+
+ HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
+ HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+
+ // The value k is based on the partitioning schema for the data, in this test case,
+ // the partitioning is HEX-based, so we start from 1/16m and the value k will be
+ // around 1/256, so the tests are done in approximately k ~= 0.003922 steps
+ double k = 0.003922;
+
+ assertSplitAtFractionFails(source, 0, k, null /* options */);
+ assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+ // With 1 items read, all split requests past k will succeed.
+ assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
+ // With 3 items read, all split requests past 3k will succeed.
+ assertSplitAtFractionFails(source, 3, 2 * k, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* options */);
+ // With 6 items read, all split requests past 6k will succeed.
+ assertSplitAtFractionFails(source, 6, 5 * k, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
+ }
+
@Test
public void testReadingDisplayData() {
HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable");
[2/2] beam git commit: This closes #3754
Posted by ie...@apache.org.
This closes #3754
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4b66f3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4b66f3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4b66f3a
Branch: refs/heads/master
Commit: f4b66f3a09e6a2fe19dc49ec45cec5f5c4f08fde
Parents: 20d88db e951baa
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Fri Aug 25 09:22:52 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 09:22:52 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 81 +++++++++++++++++++-
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 56 +++++++++++++-
2 files changed, 132 insertions(+), 5 deletions(-)
----------------------------------------------------------------------