You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "georgecma (via GitHub)" <gi...@apache.org> on 2023/03/16 17:23:13 UTC

[GitHub] [beam] georgecma opened a new pull request, #25864: feat: two-way replication for Hbase-Bigtable replication

georgecma opened a new pull request, #25864:
URL: https://github.com/apache/beam/pull/25864

   Fixes https://github.com/googleapis/java-bigtable-hbase/pull/3920
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] georgecma commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "georgecma (via GitHub)" <gi...@apache.org>.
georgecma commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1143954688


##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for testing purposes. */
+public class HashUtils {

Review Comment:
   added <p><b>Internal only:</b> 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1145484111


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't
+ * natively serializable.
+ */
+class HBaseRowMutationsCoder extends StructuredCoder<RowMutations> implements Serializable {
+  private static final HBaseRowMutationsCoder INSTANCE = new HBaseRowMutationsCoder();
+
+  public HBaseRowMutationsCoder() {
+    byteArrayCoder = ByteArrayCoder.of();
+    listCoder = ListCoder.of(HBaseMutationCoder.of());
+  }
+
+  public static HBaseRowMutationsCoder of() {
+    return INSTANCE;
+  }
+
+  private final ByteArrayCoder byteArrayCoder;
+  private final ListCoder<Mutation> listCoder;
+
+  @Override
+  public void encode(RowMutations value, OutputStream outStream) throws IOException {
+    byteArrayCoder.encode(value.getRow(), outStream);
+    listCoder.encode(value.getMutations(), outStream);
+  }
+
+  @Override
+  public RowMutations decode(InputStream inStream) throws IOException {
+
+    byte[] rowKey = byteArrayCoder.decode(inStream);
+    List<Mutation> mutations = listCoder.decode(inStream);
+
+    RowMutations rowMutations = new RowMutations(rowKey);
+    for (Mutation m : mutations) {
+      MutationType type = getType(m);
+
+      if (type == MutationType.PUT) {
+        rowMutations.add((Put) m);
+      } else if (type == MutationType.DELETE) {
+        rowMutations.add((Delete) m);
+      }

Review Comment:
   Do we need to raise IllegalArguementException instead of implicitly ignore here as getType() below?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors are transient within
+ * single worker machine. Connectors are not persisted between worker machines as Connection
+ * serialization is not implemented. Each worker will create its own connection and share it between
+ * all its threads.
+ */
+class HBaseSharedConnection implements Serializable {
+  private static final long serialVersionUID = 5252999807656940415L;
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+  // Transient connection to be initialized per worker
+  // Wrap Connection in array because static Connection cannot be non-null in beam repo
+  private static @MonotonicNonNull Connection connection = null;
+  // Number of threads using the shared connection, close connection if connectionCount goes to 0
+  private static int connectionCount;
+
+  /**
+   * Create or return existing Hbase connection.
+   *
+   * @param configuration Hbase configuration
+   * @return Hbase connection
+   * @throws IOException
+   */
+  public static synchronized @MonotonicNonNull Connection getOrCreate(Configuration configuration)
+      throws IOException {
+    if (connection == null || connection.isClosed()) {
+      connection = ConnectionFactory.createConnection(configuration);
+      connectionCount = 0;
+    }
+    connectionCount++;
+    return connection;
+  }
+
+  /**
+   * Decrement connector count and close connection if no more connector is using it.
+   *
+   * @throws IOException
+   */
+  public static synchronized void close() throws IOException {
+    connectionCount--;
+    if (connectionCount == 0) {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+    if (connectionCount < 0) {
+      LOG.warn("Connection count at " + connectionCount + ", should not be possible");

Review Comment:
   Looking at the class alone it is actually possible, if close() is called multiple times, because this is a public method. warn that closing a closed connection and set counter back to 0?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors are transient within
+ * single worker machine. Connectors are not persisted between worker machines as Connection
+ * serialization is not implemented. Each worker will create its own connection and share it between
+ * all its threads.
+ */
+class HBaseSharedConnection implements Serializable {
+  private static final long serialVersionUID = 5252999807656940415L;
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+  // Transient connection to be initialized per worker
+  // Wrap Connection in array because static Connection cannot be non-null in beam repo
+  private static @MonotonicNonNull Connection connection = null;
+  // Number of threads using the shared connection, close connection if connectionCount goes to 0
+  private static int connectionCount;
+
+  /**
+   * Create or return existing Hbase connection.
+   *
+   * @param configuration Hbase configuration
+   * @return Hbase connection
+   * @throws IOException
+   */
+  public static synchronized @MonotonicNonNull Connection getOrCreate(Configuration configuration)

Review Comment:
   No need `@MonotonicNonNull` here? As it should return a non-null connection always.



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't
+ * natively serializable.
+ */
+class HBaseRowMutationsCoder extends StructuredCoder<RowMutations> implements Serializable {
+  private static final HBaseRowMutationsCoder INSTANCE = new HBaseRowMutationsCoder();
+
+  public HBaseRowMutationsCoder() {
+    byteArrayCoder = ByteArrayCoder.of();
+    listCoder = ListCoder.of(HBaseMutationCoder.of());
+  }
+
+  public static HBaseRowMutationsCoder of() {
+    return INSTANCE;
+  }
+
+  private final ByteArrayCoder byteArrayCoder;
+  private final ListCoder<Mutation> listCoder;
+
+  @Override
+  public void encode(RowMutations value, OutputStream outStream) throws IOException {
+    byteArrayCoder.encode(value.getRow(), outStream);
+    listCoder.encode(value.getMutations(), outStream);
+  }
+
+  @Override
+  public RowMutations decode(InputStream inStream) throws IOException {
+
+    byte[] rowKey = byteArrayCoder.decode(inStream);
+    List<Mutation> mutations = listCoder.decode(inStream);
+
+    RowMutations rowMutations = new RowMutations(rowKey);
+    for (Mutation m : mutations) {
+      MutationType type = getType(m);
+
+      if (type == MutationType.PUT) {
+        rowMutations.add((Put) m);
+      } else if (type == MutationType.DELETE) {
+        rowMutations.add((Delete) m);
+      }
+    }
+    return rowMutations;
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<

Review Comment:
   These notations are not necessary and should be automatically added during compiling. `public List<? extends Coder<?>>` should suffice. Same below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25864: Overload HbaseIO with KV

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25864:
URL: https://github.com/apache/beam/pull/25864#issuecomment-1472467618

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #25864:
URL: https://github.com/apache/beam/pull/25864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] georgecma commented on pull request #25864: Overload HbaseIO with KV

Posted by "georgecma (via GitHub)" <gi...@apache.org>.
georgecma commented on PR #25864:
URL: https://github.com/apache/beam/pull/25864#issuecomment-1480326305

   Resolved issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25864:
URL: https://github.com/apache/beam/pull/25864#issuecomment-1478635682

   (Java_xxx_IO_Direct tests should report spotbug as Java_PreCommit. Add to my backlog for fix)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] georgecma commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "georgecma (via GitHub)" <gi...@apache.org>.
georgecma commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1143888423


##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for testing purposes. */
+public class HashUtils {

Review Comment:
   HashUtils needs to be public because it's in package org.apache.beam.sdk.io.hbase.utils;
   We can unnest the test utils to be in the parent folder but it'll make the tests less tidy
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25864:
URL: https://github.com/apache/beam/pull/25864#issuecomment-1478634570

   There are following warnings:
   
   ```
   Bad practice Warnings
   Code	Warning
   FS	Format string should use %n rather than \n in org.apache.beam.sdk.io.hbase.HBaseSharedConnection.getDebugString()
   Se	Class org.apache.beam.sdk.io.hbase.HBaseIO$WriteRowMutations defines non-transient non-serializable instance field configuration
   ```
   
   this can be found by running `./gradlew :sdks:java:io:hbase:spotbugMain` locally


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] georgecma commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "georgecma (via GitHub)" <gi...@apache.org>.
georgecma commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1151099337


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors are transient within

Review Comment:
   We map connections according to a specific attribute of Configuration: zookeeper.quorum, which is guaranteed unique per hbase cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] georgecma commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "georgecma (via GitHub)" <gi...@apache.org>.
georgecma commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1143888863


##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.nio.charset.StandardCharsets;
+
+/** Constants used for testing purposes. */
+public class TestConstants {

Review Comment:
   TestConstants  needs to be public because it's in package org.apache.beam.sdk.io.hbase.utils;
   We can unnest the test utils to be in the parent folder but it'll make the tests less tidy
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1143899928


##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for testing purposes. */
+public class HashUtils {

Review Comment:
   I see, then may add something like "Internal only: ..."  in the javadoc like other public scoped internal utility classes in beam



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1145535514


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors are transient within

Review Comment:
   I just realize that the current implementation does not consider the case where there are multiple sinks to different destination. It will all write to the firstly setup configuration regardless configuration passed in. We may maintain a static Map of Configuration to fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1143899928


##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for testing purposes. */
+public class HashUtils {

Review Comment:
   I see, then may add something like "Internal only: ..." like other public scoped internal utility classes in beam



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25864: Overload HbaseIO with KV

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1142490279


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {
+
+    /** Writes to the HBase instance indicated by the given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement object that will be
+     * serialized instead of the original one. We use this to keep the enclosed class immutable. For
+     * more details on the technique see <a
+     * href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) {
+        checkNotNull(writeRowMutations.tableId, "tableId");
+        checkNotNull(writeRowMutations.configuration, "configuration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = HBaseSharedConnection.getOrCreate(configuration);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        table = connection.getTable(TableName.valueOf(tableId));
+        recordsWritten = 0;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        LOG.debug("Wrote {} records", recordsWritten);
+      }
+
+      @Teardown
+      public void tearDown() throws Exception {
+
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        HBaseSharedConnection.close();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        RowMutations mutations = c.element().getValue();
+
+        try {
+          // Use Table instead of BufferedMutator to preserve mutation-ordering
+          table.mutateRow(mutations);
+          recordsWritten++;
+        } catch (Exception e) {
+          throw new Exception(

Review Comment:
   catch checked IOException and throw RuntimeException ?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are not persisted between

Review Comment:
   i.e., Connectors are transient within single worker machine



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are not persisted between
+ * worker machines as Connection serialization is not implemented. Each worker will create its own
+ * connection and share it between all its threads.
+ */
+public class HBaseSharedConnection implements Serializable {
+  private static final long serialVersionUID = 5252999807656940415L;
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+  // Transient connection to be initialized per worker
+  // Wrap Connection in array because static Connection cannot be non-null in beam repo
+  private static Connection[] connection = new Connection[1];

Review Comment:
   Should use
   ```
    private static @MonotonicNonNull Connection connection = null;
   ```
   here, then assign connection in getOrCreate. This resolves checkFramework violation.



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't
+ * natively serializable.
+ */
+public class HBaseRowMutationsCoder extends AtomicCoder<RowMutations> implements Serializable {

Review Comment:
   This should be a SstructuredCoder<RowMutations> and uses existing AtomicCoders to encode/decode rather than operating to the inStream/outStream directly. We already have the needed AtomicCoders (ByteArrayCoder for key, ListCoder and HBaseMutationCoder for list of mutations.



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {
+
+    /** Writes to the HBase instance indicated by the given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement object that will be
+     * serialized instead of the original one. We use this to keep the enclosed class immutable. For
+     * more details on the technique see <a
+     * href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) {

Review Comment:
   clean up leftover code in comment?



##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.nio.charset.StandardCharsets;
+
+/** Constants used for testing purposes. */
+public class TestConstants {

Review Comment:
   also here, no need to be public



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are not persisted between
+ * worker machines as Connection serialization is not implemented. Each worker will create its own
+ * connection and share it between all its threads.
+ */
+public class HBaseSharedConnection implements Serializable {

Review Comment:
   no need to be public?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't
+ * natively serializable.
+ */
+public class HBaseRowMutationsCoder extends AtomicCoder<RowMutations> implements Serializable {

Review Comment:
   no need to be public



##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for testing purposes. */
+public class HashUtils {

Review Comment:
   no need to be public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org