You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/01/23 23:02:11 UTC
[38/50] [abbrv] hbase git commit: HBASE-17491 Remove all setters from
HTable interface and introduce a TableBuilder to build Table instance
HBASE-17491 Remove all setters from HTable interface and introduce a TableBuilder to build Table instance
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/07e0a30e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/07e0a30e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/07e0a30e
Branch: refs/heads/HBASE-16961
Commit: 07e0a30efa332ab451e5f5729dd8257eced82c4d
Parents: 7754a96
Author: Yu Li <li...@apache.org>
Authored: Mon Jan 23 13:51:03 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Mon Jan 23 13:57:01 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/Connection.java | 15 +++-
.../hbase/client/ConnectionConfiguration.java | 11 ++-
.../hbase/client/ConnectionImplementation.java | 12 ++-
.../org/apache/hadoop/hbase/client/HTable.java | 55 ++++++-------
.../org/apache/hadoop/hbase/client/Table.java | 6 ++
.../hadoop/hbase/client/TableBuilder.java | 71 +++++++++++++++++
.../hadoop/hbase/client/TableBuilderBase.java | 83 ++++++++++++++++++++
.../hbase/spark/HBaseConnectionCacheSuite.scala | 3 +-
8 files changed, 222 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index b979c6a..a8cd296 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -83,7 +83,9 @@ public interface Connection extends Abortable, Closeable {
* @param tableName the name of the table
* @return a Table to use for interactions with this table
*/
- Table getTable(TableName tableName) throws IOException;
+ default Table getTable(TableName tableName) throws IOException {
+ return getTable(tableName, null);
+ }
/**
* Retrieve a Table implementation for accessing a table.
@@ -102,7 +104,9 @@ public interface Connection extends Abortable, Closeable {
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return a Table to use for interactions with this table
*/
- Table getTable(TableName tableName, ExecutorService pool) throws IOException;
+ default Table getTable(TableName tableName, ExecutorService pool) throws IOException {
+ return getTableBuilder(tableName, pool).build();
+ }
/**
* <p>
@@ -173,4 +177,11 @@ public interface Connection extends Abortable, Closeable {
* @return true if this connection is closed
*/
boolean isClosed();
+
+ /**
+ * Returns an {@link TableBuilder} for creating {@link Table}.
+ * @param tableName the name of the table
+ * @param pool the thread pool to use for requests like batch and scan
+ */
+ TableBuilder getTableBuilder(TableName tableName, ExecutorService pool);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 41f5baf..bea91da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -42,9 +42,10 @@ public class ConnectionConfiguration {
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
+ private final int rpcTimeout;
private final int readRpcTimeout;
private final int writeRpcTimeout;
- // toggle for async/sync prefetch
+ // toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
/**
@@ -82,6 +83,9 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+ this.rpcTimeout =
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
@@ -108,6 +112,7 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
}
public int getReadRpcTimeout() {
@@ -158,4 +163,8 @@ public class ConnectionConfiguration {
return clientScannerAsyncPrefetch;
}
+ public int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index adbc7f9..ca21365 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -327,9 +327,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
- public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
- return new HTable(tableName, this, connectionConfig,
- rpcCallerFactory, rpcControllerFactory, pool);
+ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
+ return new TableBuilderBase(tableName, connectionConfig) {
+
+ @Override
+ public Table build() {
+ return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
+ rpcControllerFactory, pool);
+ }
+ };
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fd5eda3..3bb0a77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -82,10 +82,9 @@ import org.apache.hadoop.hbase.util.Threads;
* Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
* class comment for an example of how.
*
- * <p>This class is NOT thread safe for reads nor writes.
- * In the case of writes (Put, Delete), the underlying write buffer can
- * be corrupted if multiple threads contend over a single HTable instance.
- * In the case of reads, some fields used by a Scan are shared among all threads.
+ * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
+ * All setters are moved into {@link TableBuilder} and reserved here only for keeping
+ * backward compatibility, and TODO will be removed soon.
*
* <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
* InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
@@ -115,10 +114,12 @@ public class HTable implements Table {
private final long scannerMaxResultSize;
private final ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
+ private final int rpcTimeout; // FIXME we should use this for rpc like batch and checkAndXXX
private int readRpcTimeout; // timeout for each read rpc request
private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator;
+ private final long writeBufferSize;
/** The Async process for batch */
@VisibleForTesting
@@ -150,31 +151,24 @@ public class HTable implements Table {
* Creates an object to access a HBase table.
* Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to
* get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
- * @param tableName Name of the table.
* @param connection Connection to be used.
+ * @param builder The table builder
+ * @param rpcCallerFactory The RPC caller factory
+ * @param rpcControllerFactory The RPC controller factory
* @param pool ExecutorService to be used.
- * @throws IOException if a remote or network exception occurs
*/
@InterfaceAudience.Private
- protected HTable(TableName tableName, final ClusterConnection connection,
- final ConnectionConfiguration tableConfig,
+ protected HTable(final ClusterConnection connection,
+ final TableBuilderBase builder,
final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory,
- final ExecutorService pool) throws IOException {
+ final ExecutorService pool) {
if (connection == null || connection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
- if (tableName == null) {
- throw new IllegalArgumentException("Given table name is null");
- }
- this.tableName = tableName;
this.connection = connection;
this.configuration = connection.getConfiguration();
- if (tableConfig == null) {
- connConfiguration = new ConnectionConfiguration(configuration);
- } else {
- connConfiguration = tableConfig;
- }
+ this.connConfiguration = connection.getConnectionConfiguration();
if (pool == null) {
this.pool = getDefaultExecutor(this.configuration);
this.cleanupPoolOnClose = true;
@@ -194,10 +188,12 @@ public class HTable implements Table {
this.rpcControllerFactory = rpcControllerFactory;
}
- this.operationTimeout = tableName.isSystemTable() ?
- connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
- this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
- this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+ this.tableName = builder.tableName;
+ this.operationTimeout = builder.operationTimeout;
+ this.rpcTimeout = builder.rpcTimeout;
+ this.readRpcTimeout = builder.readRpcTimeout;
+ this.writeRpcTimeout = builder.writeRpcTimeout;
+ this.writeBufferSize = builder.writeBufferSize;
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
@@ -215,15 +211,16 @@ public class HTable implements Table {
connection = conn;
this.tableName = mutator.getName();
this.configuration = connection.getConfiguration();
- connConfiguration = new ConnectionConfiguration(configuration);
+ connConfiguration = connection.getConnectionConfiguration();
cleanupPoolOnClose = false;
this.mutator = mutator;
- this.operationTimeout = tableName.isSystemTable() ?
- connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+ this.operationTimeout = connConfiguration.getOperationTimeout();
+ this.rpcTimeout = connConfiguration.getRpcTimeout();
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
+ this.writeBufferSize = connConfiguration.getWriteBufferSize();
this.rpcControllerFactory = null;
this.rpcCallerFactory = null;
this.pool = mutator.getPool();
@@ -1058,6 +1055,7 @@ public class HTable implements Table {
* @throws IOException if a remote or network exception occurs.
*/
@Override
+ @Deprecated
public void setWriteBufferSize(long writeBufferSize) throws IOException {
getBufferedMutator();
mutator.setWriteBufferSize(writeBufferSize);
@@ -1162,6 +1160,7 @@ public class HTable implements Table {
}
@Override
+ @Deprecated
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
if (mutator != null) {
@@ -1177,7 +1176,7 @@ public class HTable implements Table {
@Override
@Deprecated
public int getRpcTimeout() {
- return readRpcTimeout;
+ return rpcTimeout;
}
@Override
@@ -1193,6 +1192,7 @@ public class HTable implements Table {
}
@Override
+ @Deprecated
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
if (mutator != null) {
@@ -1204,6 +1204,7 @@ public class HTable implements Table {
public int getReadRpcTimeout() { return readRpcTimeout; }
@Override
+ @Deprecated
public void setReadRpcTimeout(int readRpcTimeout) {
this.readRpcTimeout = readRpcTimeout;
}
@@ -1335,7 +1336,7 @@ public class HTable implements Table {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName)
.pool(pool)
- .writeBufferSize(connConfiguration.getWriteBufferSize())
+ .writeBufferSize(writeBufferSize)
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
.opertationTimeout(operationTimeout)
.rpcTimeout(writeRpcTimeout)
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 0f30cb4..90fee8d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -593,7 +593,9 @@ public interface Table extends Closeable {
* total time being blocking reach the operation timeout before retries exhausted, it will break
* early and throw SocketTimeoutException.
* @param operationTimeout the total timeout of each operation in millisecond.
+ * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead
*/
+ @Deprecated
void setOperationTimeout(int operationTimeout);
/**
@@ -637,7 +639,9 @@ public interface Table extends Closeable {
* until retries exhausted or operation timeout reached.
*
* @param readRpcTimeout
+ * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead
*/
+ @Deprecated
void setReadRpcTimeout(int readRpcTimeout);
/**
@@ -652,6 +656,8 @@ public interface Table extends Closeable {
* until retries exhausted or operation timeout reached.
*
* @param writeRpcTimeout
+ * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead
*/
+ @Deprecated
void setWriteRpcTimeout(int writeRpcTimeout);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
new file mode 100644
index 0000000..27e1596
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * For creating {@link Table} instance.
+ * <p>
+ * The implementation should have default configurations set before returning the builder to user.
+ * So users are free to only set the configurations they care about to create a new
+ * Table instance.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TableBuilder {
+
+ /**
+ * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
+ * effected by this value, see scanTimeoutNs.
+ * <p>
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+ * we will stop retrying when we reach any of the limitations.
+ */
+ TableBuilder setOperationTimeout(int timeout);
+
+ /**
+ * Set timeout for each rpc request.
+ * <p>
+ * Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
+ * and write request(put, delete).
+ */
+ TableBuilder setRpcTimeout(int timeout);
+
+ /**
+ * Set timeout for each read(get, scan) rpc request.
+ */
+ TableBuilder setReadRpcTimeout(int timeout);
+
+ /**
+ * Set timeout for each write(put, delete) rpc request.
+ */
+ TableBuilder setWriteRpcTimeout(int timeout);
+
+ /**
+ * Set the write buffer size which by default is specified by the
+ * {@code hbase.client.write.buffer} setting.
+ */
+ TableBuilder setWriteBufferSize(long writeBufferSize);
+
+ /**
+ * Create the {@link Table} instance.
+ */
+ Table build();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
new file mode 100644
index 0000000..adf1abb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Base class for all table builders.
+ */
+@InterfaceAudience.Private
+abstract class TableBuilderBase implements TableBuilder {
+
+ protected TableName tableName;
+
+ protected int operationTimeout;
+
+ protected int rpcTimeout;
+
+ protected int readRpcTimeout;
+
+ protected int writeRpcTimeout;
+
+ protected long writeBufferSize;
+
+ TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
+ if (tableName == null) {
+ throw new IllegalArgumentException("Given table name is null");
+ }
+ this.tableName = tableName;
+ this.operationTimeout = tableName.isSystemTable() ? connConf.getMetaOperationTimeout()
+ : connConf.getOperationTimeout();
+ this.rpcTimeout = connConf.getRpcTimeout();
+ this.readRpcTimeout = connConf.getReadRpcTimeout();
+ this.writeRpcTimeout = connConf.getWriteRpcTimeout();
+ this.writeBufferSize = connConf.getWriteBufferSize();
+ }
+
+ @Override
+ public TableBuilderBase setOperationTimeout(int timeout) {
+ this.operationTimeout = timeout;
+ return this;
+ }
+
+ @Override
+ public TableBuilderBase setRpcTimeout(int timeout) {
+ this.rpcTimeout = timeout;
+ return this;
+ }
+
+ @Override
+ public TableBuilderBase setReadRpcTimeout(int timeout) {
+ this.readRpcTimeout = timeout;
+ return this;
+ }
+
+ @Override
+ public TableBuilderBase setWriteRpcTimeout(int timeout) {
+ this.writeRpcTimeout = timeout;
+ return this;
+ }
+
+ @Override
+ public TableBuilder setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07e0a30e/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
index 6ebf044..b3fdd4e 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService
import scala.util.Random
import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
- Connection, BufferedMutatorParams, Admin}
+ Connection, BufferedMutatorParams, Admin, TableBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.spark.Logging
@@ -50,6 +50,7 @@ class ConnectionMocker extends Connection {
def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
def getBufferedMutator (tableName: TableName): BufferedMutator = null
def getAdmin: Admin = null
+ def getTableBuilder(tableName: TableName, pool: ExecutorService): TableBuilder = null
def close(): Unit = {
if (isClosed)