You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/03/15 19:53:05 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #25831: Overload HbaseIO with KV

Abacn commented on code in PR #25831:
URL: https://github.com/apache/beam/pull/25831#discussion_r1137333896


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation order if the upstream

Review Comment:
   ```suggestion
    * <p>This implementation is useful for preserving mutation order if the upstream
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of byte row keys and

Review Comment:
   ```suggestion
    * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of bytes row keys and
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */

Review Comment:
   ```suggestion
       /** Writes to the HBase instance indicated by the given Configuration. */
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>> input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement object that will be
+     * serialized instead of the original one. We use this to keep the enclosed class immutable. For
+     * more details on the technique see <a
+     * href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) {
+        checkNotNull(writeRowMutations.tableId, "tableId");
+        checkNotNull(writeRowMutations.configuration, "configuration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = HBaseSharedConnection.getOrCreate(configuration);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        table = connection.getTable(TableName.valueOf(tableId));
+        recordsWritten = 0;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        LOG.debug("Wrote {} records", recordsWritten);
+      }
+
+      @Teardown
+      public void tearDown() throws Exception {
+
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        HBaseSharedConnection.close();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        RowMutations mutations = c.element().getValue();
+
+        try {
+          // Use Table instead of BufferedMutator to preserve mutation-ordering
+          table.mutateRow(mutations);
+        } catch (Exception e) {
+          throw new Exception(
+              (String.join(
+                  " ",
+                  "Table",
+                  tableId,
+                  "row",
+                  Bytes.toString(mutations.getRow()),
+                  "mutation failed.",
+                  "\nTable Available/Enabled:",
+                  Boolean.toString(
+                      connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
+                  Boolean.toString(
+                      connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
+                  "\nConnection Closed/Aborted/Locks:",
+                  Boolean.toString(connection.isClosed()),
+                  Boolean.toString(connection.isAborted()))));
+        }
+
+        // Dummy output so that we can get Dataflow stats for throughput.
+        c.output(1);

Review Comment:
   dummy output may not be suitable in production code. Either implement WriteResult or use PDone for now (and implement WriteResult as future task)



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>> input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement object that will be
+     * serialized instead of the original one. We use this to keep the enclosed class immutable. For
+     * more details on the technique see <a
+     * href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) {
+        checkNotNull(writeRowMutations.tableId, "tableId");
+        checkNotNull(writeRowMutations.configuration, "configuration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = HBaseSharedConnection.getOrCreate(configuration);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        table = connection.getTable(TableName.valueOf(tableId));
+        recordsWritten = 0;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        LOG.debug("Wrote {} records", recordsWritten);

Review Comment:
   This is always 0, looks like missing a line of incrementing recordsWritten somewhere?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation order if the upstream
+ * is ordered by row key, as RowMutations will only be applied after previous RowMutations are
+ * successful.
+ *
+ * <p>To configure the sink, you must supply a table id string and a {@link Configuration} to
+ * identify the HBase instance, for example:
+ *
+ * <pre>{@code
+ * Configuration configuration = ...;
+ * PCollection<KV<byte[], RowMutations>> data = ...;
+ *
+ * data.apply("write",
+ *     HBaseIO.writeRowMutations()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ * }</pre>
+ *
+ * <p>Note that the transformation emits the number of RowMutations written as an integer after

Review Comment:
   It would be nice to emit some WriteResult (like BigTableIO did). To my knowledge emitting the number of element written is not common practice for Beam sinks. The other common practice (PDone, and is what HBaseIO.Write currently uses) is an antipattern, and will be changed in the future.
   
   More info: https://beam.apache.org/documentation/io/io-standards/#general



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation order if the upstream

Review Comment:
   the documentation in specific IO generally does not make statement of runner specific because io connector artifact does not depend on dataflow-runner-java. What makes this dataflow specific?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org