You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/25 15:51:11 UTC

[1/2] beam git commit: ByteKey: remove ByteString from public API, replace with ByteBuffer

Repository: beam
Updated Branches:
  refs/heads/master 6aed130cc -> 000378d6a


ByteKey: remove ByteString from public API, replace with ByteBuffer

* BigtableIO: use ByteBuffer not ByteString
* HBaseIO: replace ByteString with byte[]


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1efaa1e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1efaa1e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1efaa1e3

Branch: refs/heads/master
Commit: 1efaa1e37976a801831f72b8a7c000bdcf6d3cd8
Parents: 6aed130
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 13 15:27:57 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 25 08:50:49 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/range/ByteKey.java   | 21 ++++++++--------
 .../beam/sdk/io/range/ByteKeyRangeTest.java     | 16 ++++++------
 .../apache/beam/sdk/io/range/ByteKeyTest.java   |  4 +--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 ++++++---
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  4 +--
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 12 ++++++---
 sdks/java/io/hbase/pom.xml                      |  5 ----
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 26 +++++++++-----------
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 16 ++++++------
 9 files changed, 58 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
index e4129ff..cd2377b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString.ByteIterator;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
 
 /**
  * A class representing a key consisting of an array of bytes. Arbitrary-length
@@ -43,10 +45,11 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
   public static final ByteKey EMPTY = ByteKey.of();
 
   /**
-   * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
+   * Creates a new {@link ByteKey} backed by a copy of the data remaining in the specified
+   * {@link ByteBuffer}.
    */
-  public static ByteKey of(ByteString value) {
-    return new ByteKey(value);
+  public static ByteKey copyFrom(ByteBuffer value) {
+    return new ByteKey(ByteString.copyFrom(value));
   }
 
   /**
@@ -55,7 +58,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
    * <p>Makes a copy of the underlying array.
    */
   public static ByteKey copyFrom(byte[] bytes) {
-    return of(ByteString.copyFrom(bytes));
+    return new ByteKey(ByteString.copyFrom(bytes));
   }
 
   /**
@@ -78,12 +81,10 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
   }
 
   /**
-   * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
-   *
-   * <p>Does not copy.
+   * Returns a read-only {@link ByteBuffer} representing this {@link ByteKey}.
    */
-  public ByteString getValue() {
-    return value;
+  public ByteBuffer getValue() {
+    return value.asReadOnlyByteBuffer();
   }
 
   /**
@@ -109,7 +110,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable {
    * size.
    */
   @Override
-  public int compareTo(ByteKey other) {
+  public int compareTo(@Nonnull ByteKey other) {
     checkNotNull(other, "other");
     ByteIterator thisIt = value.iterator();
     ByteIterator otherIt = other.value.iterator();

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
index 40f6d8f..dd55970 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import org.junit.Test;
@@ -375,19 +375,19 @@ public class ByteKeyRangeTest {
 
   /** Asserts the two keys are equal except trailing zeros. */
   private static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
-    ByteString shortKey = expected.getValue();
-    ByteString longKey = key.getValue();
-    if (shortKey.size() > longKey.size()) {
+    ByteBuffer shortKey = expected.getValue();
+    ByteBuffer longKey = key.getValue();
+    if (shortKey.remaining() > longKey.remaining()) {
       shortKey = key.getValue();
       longKey = expected.getValue();
     }
-    for (int i = 0; i < shortKey.size(); ++i) {
-      if (shortKey.byteAt(i) != longKey.byteAt(i)) {
+    for (int i = 0; i < shortKey.remaining(); ++i) {
+      if (shortKey.get(i) != longKey.get(i)) {
         fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key));
       }
     }
-    for (int j = shortKey.size(); j < longKey.size(); ++j) {
-      if (longKey.byteAt(j) != 0) {
+    for (int j = shortKey.remaining(); j < longKey.remaining(); ++j) {
+      if (longKey.get(j) != 0) {
         fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key));
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
index 1117ac7..6ddfa1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java
@@ -111,7 +111,7 @@ public class ByteKeyTest {
           assertTrue(String.format("Expected that %s is equal to itself.", left), eq);
           assertTrue(
               String.format("Expected that %s is equal to a copy of itself.", left),
-              left.equals(ByteKey.of(right.getValue())));
+              left.equals(ByteKey.copyFrom(right.getValue())));
         } else {
           assertFalse(String.format("Expected that %s is not equal to %s", left, right), eq);
         }
@@ -128,7 +128,7 @@ public class ByteKeyTest {
     int collisions = 0;
     for (int i = 0; i < TEST_KEYS.length; ++i) {
       int left = TEST_KEYS[i].hashCode();
-      int leftClone = ByteKey.of(TEST_KEYS[i].getValue()).hashCode();
+      int leftClone = ByteKey.copyFrom(TEST_KEYS[i].getValue()).hashCode();
       assertEquals(
           String.format("Expected same hash code for %s and a copy of itself", TEST_KEYS[i]),
           left,

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 7bba1a6..503be18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -664,6 +664,10 @@ public class BigtableIO {
   /** Disallow construction of utility class. */
   private BigtableIO() {}
 
+  private static ByteKey makeByteKey(ByteString key) {
+    return ByteKey.copyFrom(key.asReadOnlyByteBuffer());
+  }
+
   static class BigtableSource extends BoundedSource<Row> {
     public BigtableSource(
         SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
@@ -759,7 +763,7 @@ public class BigtableIO {
       long lastOffset = 0;
       ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
       for (SampleRowKeysResponse response : sampleRowKeys) {
-        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
+        ByteKey responseEndKey = makeByteKey(response.getRowKey());
         long responseOffset = response.getOffsetBytes();
         checkState(
             responseOffset >= lastOffset,
@@ -837,7 +841,7 @@ public class BigtableIO {
       // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
       // filter or to sample on a given key range.
       for (SampleRowKeysResponse response : samples) {
-        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
+        ByteKey currentEndKey = makeByteKey(response.getRowKey());
         long currentOffset = response.getOffsetBytes();
         if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
           // Skip an empty region.
@@ -950,7 +954,7 @@ public class BigtableIO {
       reader = service.createReader(getCurrentSource());
       boolean hasRecord =
           reader.start()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+              && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey()))
               || rangeTracker.markDone();
       if (hasRecord) {
         ++recordsReturned;
@@ -967,7 +971,7 @@ public class BigtableIO {
     public boolean advance() throws IOException {
       boolean hasRecord =
           reader.advance()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()))
+              && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey()))
               || rangeTracker.markDone();
       if (hasRecord) {
         ++recordsReturned;

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 1a4937c..90102c8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -117,8 +117,8 @@ class BigtableServiceImpl implements BigtableService {
     public boolean start() throws IOException {
       RowRange range =
           RowRange.newBuilder()
-              .setStartKeyClosed(source.getRange().getStartKey().getValue())
-              .setEndKeyOpen(source.getRange().getEndKey().getValue())
+              .setStartKeyClosed(ByteString.copyFrom(source.getRange().getStartKey().getValue()))
+              .setEndKeyOpen(ByteString.copyFrom(source.getRange().getEndKey().getValue()))
               .build();
       RowSet rowSet = RowSet.newBuilder()
           .addRowRanges(range)

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cf96b65..43957e3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -148,6 +148,10 @@ public class BigtableIOTest {
     bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
   }
 
+  private static ByteKey makeByteKey(ByteString key) {
+    return ByteKey.copyFrom(key.asReadOnlyByteBuffer());
+  }
+
   @Test
   public void testReadBuildsCorrectly() {
     BigtableIO.Read read =
@@ -328,7 +332,7 @@ public class BigtableIOTest {
           @Override
           public boolean apply(@Nullable Row input) {
             verifyNotNull(input, "input");
-            return range.containsKey(ByteKey.of(input.getKey()));
+            return range.containsKey(makeByteKey(input.getKey()));
           }
         }));
   }
@@ -790,7 +794,7 @@ public class BigtableIOTest {
     public ByteKeyRange getTableRange(String tableId) {
       verifyTableExists(tableId);
       SortedMap<ByteString, ByteString> data = tables.get(tableId);
-      return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
+      return ByteKeyRange.of(makeByteKey(data.firstKey()), makeByteKey(data.lastKey()));
     }
 
     public void createTable(String tableId) {
@@ -890,7 +894,7 @@ public class BigtableIOTest {
       while (rows.hasNext()) {
         entry = rows.next();
         if (!filter.apply(entry.getKey())
-            || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
+            || !source.getRange().containsKey(makeByteKey(entry.getKey()))) {
           // Does not match row filter or does not match source range. Skip.
           entry = null;
           continue;
@@ -969,7 +973,7 @@ public class BigtableIOTest {
   private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
     @Override
     public int compare(ByteString o1, ByteString o2) {
-      return ByteKey.of(o1).compareTo(ByteKey.of(o2));
+      return makeByteKey(o1).compareTo(makeByteKey(o2));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index d8cb95f..0798efc 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -100,11 +100,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index ccdcef6..1c8afbd 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.io.hbase;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,9 +30,8 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteStringCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -52,7 +49,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLoad;
@@ -126,8 +122,8 @@ import org.slf4j.LoggerFactory;
  * <h3>Writing to HBase</h3>
  *
  * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
+ * {@link PCollection PCollection&lt;KV&lt;byte[], Iterable&lt;Mutation&gt;&gt;&gt;}, where the
+ * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an
  * idempotent transformation to that row.
  *
  * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
@@ -135,7 +131,7 @@ import org.slf4j.LoggerFactory;
  *
  * <pre>{@code
  * Configuration configuration = ...;
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ * PCollection<KV<byte[], Iterable<Mutation>>> data = ...;
  * data.setCoder(HBaseIO.WRITE_CODER);
  *
  * data.apply("write",
@@ -549,7 +545,7 @@ public class HBaseIO {
      * @see HBaseIO
      */
     public static class Write
-            extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
+            extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> {
 
         /**
          * Returns a new {@link HBaseIO.Write} that will write to the HBase instance
@@ -578,13 +574,13 @@ public class HBaseIO {
         }
 
         @Override
-        public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+        public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) {
             input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
             return PDone.in(input.getPipeline());
         }
 
         @Override
-        public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+        public void validate(PCollection<KV<byte[], Iterable<Mutation>>> input) {
             checkArgument(serializableConfiguration != null, "Configuration not specified");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
             try (Connection connection = ConnectionFactory.createConnection(
@@ -616,7 +612,7 @@ public class HBaseIO {
         private final String tableId;
         private final SerializableConfiguration serializableConfiguration;
 
-        private class HBaseWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+        private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> {
 
             public HBaseWriterFn(String tableId,
                                  SerializableConfiguration serializableConfiguration) {
@@ -645,7 +641,7 @@ public class HBaseIO {
 
             @ProcessElement
             public void processElement(ProcessContext ctx) throws Exception {
-                KV<ByteString, Iterable<Mutation>> record = ctx.element();
+                KV<byte[], Iterable<Mutation>> record = ctx.element();
                 List<Mutation> mutations = new ArrayList<>();
                 for (Mutation mutation : record.getValue()) {
                     mutations.add(mutation);
@@ -687,6 +683,6 @@ public class HBaseIO {
         }
     }
 
-    public static final Coder<KV<ByteString, Iterable<Mutation>>> WRITE_CODER =
-            KvCoder.of(ByteStringCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
+    public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER =
+            KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index c2410ea..bf8cb4b 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
-import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
@@ -284,7 +284,7 @@ public class HBaseIOTest {
     public void testWritingFailsTableDoesNotExist() throws Exception {
         final String table = "TEST-TABLE";
 
-        PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+        PCollection<KV<byte[], Iterable<Mutation>>> emptyInput =
                 p.apply(Create.empty(HBaseIO.WRITE_CODER));
 
         // Exception will be thrown by write.validate() when write is applied.
@@ -380,8 +380,8 @@ public class HBaseIOTest {
 
     // Beam helper methods
     /** Helper function to make a single row mutation to be written. */
-    private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
-        ByteString rowKey = ByteString.copyFromUtf8(key);
+    private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
+        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
         List<Mutation> mutations = new ArrayList<>();
         mutations.add(makeMutation(key, value));
         return KV.of(rowKey, (Iterable<Mutation>) mutations);
@@ -389,17 +389,17 @@ public class HBaseIOTest {
 
 
     private static Mutation makeMutation(String key, String value) {
-        ByteString rowKey = ByteString.copyFromUtf8(key);
-        return new Put(rowKey.toByteArray())
+        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
+        return new Put(rowKey)
                     .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
                     .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com"));
     }
 
-    private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
+    private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
         Put put = new Put(key.getBytes());
         List<Mutation> mutations = new ArrayList<>();
         mutations.add(put);
-        return KV.of(ByteString.copyFromUtf8(key), (Iterable<Mutation>) mutations);
+        return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations);
     }
 
     private void runReadTest(HBaseIO.Read read, List<Result> expected) {


[2/2] beam git commit: This closes #2531

Posted by dh...@apache.org.
This closes #2531


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/000378d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/000378d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/000378d6

Branch: refs/heads/master
Commit: 000378d6aafcc45bf10bcda78a90e13ab1d36f1c
Parents: 6aed130 1efaa1e
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 08:50:59 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 25 08:50:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/range/ByteKey.java   | 21 ++++++++--------
 .../beam/sdk/io/range/ByteKeyRangeTest.java     | 16 ++++++------
 .../apache/beam/sdk/io/range/ByteKeyTest.java   |  4 +--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 ++++++---
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  4 +--
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 12 ++++++---
 sdks/java/io/hbase/pom.xml                      |  5 ----
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 26 +++++++++-----------
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 16 ++++++------
 9 files changed, 58 insertions(+), 58 deletions(-)
----------------------------------------------------------------------