You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/25 15:51:11 UTC
[1/2] beam git commit: ByteKey: remove ByteString from public API,
replace with ByteBuffer
Repository: beam
Updated Branches:
refs/heads/master 6aed130cc -> 000378d6a
ByteKey: remove ByteString from public API, replace with ByteBuffer
* BigtableIO: use ByteBuffer not ByteString
* HBaseIO: replace ByteString with byte[]
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1efaa1e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1efaa1e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1efaa1e3
Branch: refs/heads/master
Commit: 1efaa1e37976a801831f72b8a7c000bdcf6d3cd8
Parents: 6aed130
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 13 15:27:57 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 25 08:50:49 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/range/ByteKey.java | 21 ++++++++--------
.../beam/sdk/io/range/ByteKeyRangeTest.java | 16 ++++++------
.../apache/beam/sdk/io/range/ByteKeyTest.java | 4 +--
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 ++++++---
.../io/gcp/bigtable/BigtableServiceImpl.java | 4 +--
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 ++++++---
sdks/java/io/hbase/pom.xml | 5 ----
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 26 +++++++++-----------
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 16 ++++++------
9 files changed, 58 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
index e4129ff..cd2377b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteString.ByteIterator;
import java.io.Serializable;
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
/**
* A class representing a key consisting of an array of bytes. Arbitrary-length
@@ -43,10 +45,11 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
public static final ByteKey EMPTY = ByteKey.of();
/**
- * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
+ * Creates a new {@link ByteKey} backed by a copy of the data remaining in the specified
+ * {@link ByteBuffer}.
*/
- public static ByteKey of(ByteString value) {
- return new ByteKey(value);
+ public static ByteKey copyFrom(ByteBuffer value) {
+ return new ByteKey(ByteString.copyFrom(value));
}
/**
@@ -55,7 +58,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
* <p>Makes a copy of the underlying array.
*/
public static ByteKey copyFrom(byte[] bytes) {
- return of(ByteString.copyFrom(bytes));
+ return new ByteKey(ByteString.copyFrom(bytes));
}
/**
@@ -78,12 +81,10 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
}
/**
- * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
- *
- * <p>Does not copy.
+ * Returns a read-only {@link ByteBuffer} representing this {@link ByteKey}.
*/
- public ByteString getValue() {
- return value;
+ public ByteBuffer getValue() {
+ return value.asReadOnlyByteBuffer();
}
/**
@@ -109,7 +110,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
* size.
*/
@Override
- public int compareTo(ByteKey other) {
+ public int compareTo(@Nonnull ByteKey other) {
checkNotNull(other, "other");
ByteIterator thisIt = value.iterator();
ByteIterator otherIt = other.value.iterator();
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
index 40f6d8f..dd55970 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
@@ -375,19 +375,19 @@ public class ByteKeyRangeTest {
/** Asserts the two keys are equal except trailing zeros. */
private static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
- ByteString shortKey = expected.getValue();
- ByteString longKey = key.getValue();
- if (shortKey.size() > longKey.size()) {
+ ByteBuffer shortKey = expected.getValue();
+ ByteBuffer longKey = key.getValue();
+ if (shortKey.remaining() > longKey.remaining()) {
shortKey = key.getValue();
longKey = expected.getValue();
}
- for (int i = 0; i < shortKey.size(); ++i) {
- if (shortKey.byteAt(i) != longKey.byteAt(i)) {
+ for (int i = 0; i < shortKey.remaining(); ++i) {
+ if (shortKey.get(i) != longKey.get(i)) {
fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key));
}
}
- for (int j = shortKey.size(); j < longKey.size(); ++j) {
- if (longKey.byteAt(j) != 0) {
+ for (int j = shortKey.remaining(); j < longKey.remaining(); ++j) {
+ if (longKey.get(j) != 0) {
fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
index 1117ac7..6ddfa1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
@@ -111,7 +111,7 @@ public class ByteKeyTest {
assertTrue(String.format("Expected that %s is equal to itself.", left), eq);
assertTrue(
String.format("Expected that %s is equal to a copy of itself.", left),
- left.equals(ByteKey.of(right.getValue())));
+ left.equals(ByteKey.copyFrom(right.getValue())));
} else {
assertFalse(String.format("Expected that %s is not equal to %s", left, right), eq);
}
@@ -128,7 +128,7 @@ public class ByteKeyTest {
int collisions = 0;
for (int i = 0; i < TEST_KEYS.length; ++i) {
int left = TEST_KEYS[i].hashCode();
- int leftClone = ByteKey.of(TEST_KEYS[i].getValue()).hashCode();
+ int leftClone = ByteKey.copyFrom(TEST_KEYS[i].getValue()).hashCode();
assertEquals(
String.format("Expected same hash code for %s and a copy of itself", TEST_KEYS[i]),
left,
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 7bba1a6..503be18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -664,6 +664,10 @@ public class BigtableIO {
/** Disallow construction of utility class. */
private BigtableIO() {}
+ private static ByteKey makeByteKey(ByteString key) {
+ return ByteKey.copyFrom(key.asReadOnlyByteBuffer());
+ }
+
static class BigtableSource extends BoundedSource<Row> {
public BigtableSource(
SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
@@ -759,7 +763,7 @@ public class BigtableIO {
long lastOffset = 0;
ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
for (SampleRowKeysResponse response : sampleRowKeys) {
- ByteKey responseEndKey = ByteKey.of(response.getRowKey());
+ ByteKey responseEndKey = makeByteKey(response.getRowKey());
long responseOffset = response.getOffsetBytes();
checkState(
responseOffset >= lastOffset,
@@ -837,7 +841,7 @@ public class BigtableIO {
// TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
// filter or to sample on a given key range.
for (SampleRowKeysResponse response : samples) {
- ByteKey currentEndKey = ByteKey.of(response.getRowKey());
+ ByteKey currentEndKey = makeByteKey(response.getRowKey());
long currentOffset = response.getOffsetBytes();
if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
// Skip an empty region.
@@ -950,7 +954,7 @@ public class BigtableIO {
reader = service.createReader(getCurrentSource());
boolean hasRecord =
reader.start()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+ && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey()))
|| rangeTracker.markDone();
if (hasRecord) {
++recordsReturned;
@@ -967,7 +971,7 @@ public class BigtableIO {
public boolean advance() throws IOException {
boolean hasRecord =
reader.advance()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+ && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey()))
|| rangeTracker.markDone();
if (hasRecord) {
++recordsReturned;
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 1a4937c..90102c8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -117,8 +117,8 @@ class BigtableServiceImpl implements BigtableService {
public boolean start() throws IOException {
RowRange range =
RowRange.newBuilder()
- .setStartKeyClosed(source.getRange().getStartKey().getValue())
- .setEndKeyOpen(source.getRange().getEndKey().getValue())
+ .setStartKeyClosed(ByteString.copyFrom(source.getRange().getStartKey().getValue()))
+ .setEndKeyOpen(ByteString.copyFrom(source.getRange().getEndKey().getValue()))
.build();
RowSet rowSet = RowSet.newBuilder()
.addRowRanges(range)
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cf96b65..43957e3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -148,6 +148,10 @@ public class BigtableIOTest {
bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
}
+ private static ByteKey makeByteKey(ByteString key) {
+ return ByteKey.copyFrom(key.asReadOnlyByteBuffer());
+ }
+
@Test
public void testReadBuildsCorrectly() {
BigtableIO.Read read =
@@ -328,7 +332,7 @@ public class BigtableIOTest {
@Override
public boolean apply(@Nullable Row input) {
verifyNotNull(input, "input");
- return range.containsKey(ByteKey.of(input.getKey()));
+ return range.containsKey(makeByteKey(input.getKey()));
}
}));
}
@@ -790,7 +794,7 @@ public class BigtableIOTest {
public ByteKeyRange getTableRange(String tableId) {
verifyTableExists(tableId);
SortedMap<ByteString, ByteString> data = tables.get(tableId);
- return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
+ return ByteKeyRange.of(makeByteKey(data.firstKey()), makeByteKey(data.lastKey()));
}
public void createTable(String tableId) {
@@ -890,7 +894,7 @@ public class BigtableIOTest {
while (rows.hasNext()) {
entry = rows.next();
if (!filter.apply(entry.getKey())
- || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
+ || !source.getRange().containsKey(makeByteKey(entry.getKey()))) {
// Does not match row filter or does not match source range. Skip.
entry = null;
continue;
@@ -969,7 +973,7 @@ public class BigtableIOTest {
private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
@Override
public int compare(ByteString o1, ByteString o2) {
- return ByteKey.of(o1).compareTo(ByteKey.of(o2));
+ return makeByteKey(o1).compareTo(makeByteKey(o2));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index d8cb95f..0798efc 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -100,11 +100,6 @@
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index ccdcef6..1c8afbd 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.io.hbase;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.protobuf.ByteString;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,9 +30,8 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import javax.annotation.Nullable;
-
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteStringCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -52,7 +49,6 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLoad;
@@ -126,8 +122,8 @@ import org.slf4j.LoggerFactory;
* <h3>Writing to HBase</h3>
*
* <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
+ * {@link PCollection PCollection<KV<byte[], Iterable<Mutation>>>}, where the
+ * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an
* idempotent transformation to that row.
*
* <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
@@ -135,7 +131,7 @@ import org.slf4j.LoggerFactory;
*
* <pre>{@code
* Configuration configuration = ...;
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ * PCollection<KV<byte[], Iterable<Mutation>>> data = ...;
* data.setCoder(HBaseIO.WRITE_CODER);
*
* data.apply("write",
@@ -549,7 +545,7 @@ public class HBaseIO {
* @see HBaseIO
*/
public static class Write
- extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
+ extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> {
/**
* Returns a new {@link HBaseIO.Write} that will write to the HBase instance
@@ -578,13 +574,13 @@ public class HBaseIO {
}
@Override
- public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) {
input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
return PDone.in(input.getPipeline());
}
@Override
- public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ public void validate(PCollection<KV<byte[], Iterable<Mutation>>> input) {
checkArgument(serializableConfiguration != null, "Configuration not specified");
checkArgument(!tableId.isEmpty(), "Table ID not specified");
try (Connection connection = ConnectionFactory.createConnection(
@@ -616,7 +612,7 @@ public class HBaseIO {
private final String tableId;
private final SerializableConfiguration serializableConfiguration;
- private class HBaseWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+ private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> {
public HBaseWriterFn(String tableId,
SerializableConfiguration serializableConfiguration) {
@@ -645,7 +641,7 @@ public class HBaseIO {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
- KV<ByteString, Iterable<Mutation>> record = ctx.element();
+ KV<byte[], Iterable<Mutation>> record = ctx.element();
List<Mutation> mutations = new ArrayList<>();
for (Mutation mutation : record.getValue()) {
mutations.add(mutation);
@@ -687,6 +683,6 @@ public class HBaseIO {
}
}
- public static final Coder<KV<ByteString, Iterable<Mutation>>> WRITE_CODER =
- KvCoder.of(ByteStringCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
+ public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER =
+ KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index c2410ea..bf8cb4b 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
-import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
@@ -284,7 +284,7 @@ public class HBaseIOTest {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE";
- PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+ PCollection<KV<byte[], Iterable<Mutation>>> emptyInput =
p.apply(Create.empty(HBaseIO.WRITE_CODER));
// Exception will be thrown by write.validate() when write is applied.
@@ -380,8 +380,8 @@ public class HBaseIOTest {
// Beam helper methods
/** Helper function to make a single row mutation to be written. */
- private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
- ByteString rowKey = ByteString.copyFromUtf8(key);
+ private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
+ byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
List<Mutation> mutations = new ArrayList<>();
mutations.add(makeMutation(key, value));
return KV.of(rowKey, (Iterable<Mutation>) mutations);
@@ -389,17 +389,17 @@ public class HBaseIOTest {
private static Mutation makeMutation(String key, String value) {
- ByteString rowKey = ByteString.copyFromUtf8(key);
- return new Put(rowKey.toByteArray())
+ byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
+ return new Put(rowKey)
.addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
.addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com"));
}
- private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
+ private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
Put put = new Put(key.getBytes());
List<Mutation> mutations = new ArrayList<>();
mutations.add(put);
- return KV.of(ByteString.copyFromUtf8(key), (Iterable<Mutation>) mutations);
+ return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations);
}
private void runReadTest(HBaseIO.Read read, List<Result> expected) {
[2/2] beam git commit: This closes #2531
Posted by dh...@apache.org.
This closes #2531
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/000378d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/000378d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/000378d6
Branch: refs/heads/master
Commit: 000378d6aafcc45bf10bcda78a90e13ab1d36f1c
Parents: 6aed130 1efaa1e
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 08:50:59 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 25 08:50:59 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/range/ByteKey.java | 21 ++++++++--------
.../beam/sdk/io/range/ByteKeyRangeTest.java | 16 ++++++------
.../apache/beam/sdk/io/range/ByteKeyTest.java | 4 +--
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 ++++++---
.../io/gcp/bigtable/BigtableServiceImpl.java | 4 +--
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 ++++++---
sdks/java/io/hbase/pom.xml | 5 ----
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 26 +++++++++-----------
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 16 ++++++------
9 files changed, 58 insertions(+), 58 deletions(-)
----------------------------------------------------------------------