You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/10 17:22:39 UTC
[2/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain
protobuf references Gather up the pb references into a few locations only
rather than have pb references distributed all about the code base." This is
a revert of a revert; i.e. we are addi
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7b1547d..f460bdb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -38,41 +40,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
-import com.google.protobuf.ServiceException;
-
/**
* Client proxy for SecureBulkLoadProtocol
*/
@InterfaceAudience.Private
public class SecureBulkLoadClient {
private Table table;
+ private final RpcControllerFactory rpcControllerFactory;
- public SecureBulkLoadClient(Table table) {
+ public SecureBulkLoadClient(final Configuration conf, Table table) {
this.table = table;
+ this.rpcControllerFactory = new RpcControllerFactory(conf);
}
public String prepareBulkLoad(final Connection conn) throws IOException {
try {
- RegionServerCallable<String> callable =
- new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
- @Override
- public String call(int callTimeout) throws IOException {
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- RegionSpecifier region =
- RequestConverter
- .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
- try {
- PrepareBulkLoadRequest request =
- PrepareBulkLoadRequest.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
- .setRegion(region).build();
- PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
- return response.getBulkToken();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
+ RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
+ this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+ @Override
+ protected String rpcCall() throws Exception {
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ RegionSpecifier region =
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+ PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
+ .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+ .setRegion(region).build();
+ PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
+ return response.getBulkToken();
+ }
+ };
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
@@ -82,24 +78,19 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
- RegionServerCallable<Void> callable =
- new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
- @Override
- public Void call(int callTimeout) throws IOException {
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- RegionSpecifier region = RequestConverter.buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- try {
- CleanupBulkLoadRequest request =
- CleanupBulkLoadRequest.newBuilder().setRegion(region)
- .setBulkToken(bulkToken).build();
- getStub().cleanupBulkLoad(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- return null;
- }
- };
+ RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+ this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+ @Override
+ protected Void rpcCall() throws Exception {
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ CleanupBulkLoadRequest request =
+ CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
+ getStub().cleanupBulkLoad(null, request);
+ return null;
+ }
+ };
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
@@ -130,12 +121,12 @@ public class SecureBulkLoadClient {
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
return response.getLoaded();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ } catch (Exception se) {
+ throw ProtobufUtil.handleRemoteException(se);
}
}
public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 6fae5cb..a6384e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
}
return response;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index f4f18b3..d9877dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -26,15 +28,26 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its
- * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
- * to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf
- * service chasm. Used by client and server ipc'ing.
+ * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
+ * to avoid having to protobuf them (for performance reasons). This class is used ferrying data
+ * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
+ * ipc'ing.
*/
@InterfaceAudience.Private
-public class PayloadCarryingRpcController
- extends TimeLimitedRpcController implements CellScannable {
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+ /**
+ * The time, in ms before the call should expire.
+ */
+ protected volatile Integer callTimeout;
+ protected volatile boolean cancelled = false;
+ protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
+ protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
+ private IOException exception;
public static final int PRIORITY_UNSET = -1;
/**
@@ -88,8 +101,8 @@ public class PayloadCarryingRpcController
* @param tn Set priority based off the table we are going against.
*/
public void setPriority(final TableName tn) {
- this.priority =
- (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS;
+ setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS:
+ HConstants.NORMAL_QOS);
}
/**
@@ -99,9 +112,103 @@ public class PayloadCarryingRpcController
return priority;
}
- @Override public void reset() {
- super.reset();
+ @Override
+ public void reset() {
priority = 0;
cellScanner = null;
+ exception = null;
+ cancelled = false;
+ failureCb.set(null);
+ cancellationCb.set(null);
+ callTimeout = null;
+ }
+
+ public int getCallTimeout() {
+ if (callTimeout != null) {
+ return callTimeout;
+ } else {
+ return 0;
+ }
+ }
+
+ public void setCallTimeout(int callTimeout) {
+ this.callTimeout = callTimeout;
+ }
+
+ public boolean hasCallTimeout(){
+ return callTimeout != null;
+ }
+
+ @Override
+ public String errorText() {
+ if (exception != null) {
+ return exception.getMessage();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * For use in async rpc clients
+ * @return true if failed
+ */
+ @Override
+ public boolean failed() {
+ return this.exception != null;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return cancelled;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+ this.cancellationCb.set(cancellationCb);
+ if (this.cancelled) {
+ cancellationCb.run(null);
+ }
+ }
+
+ /**
+ * Notify a callback on error.
+ * For use in async rpc clients
+ *
+ * @param failureCb the callback to call on error
+ */
+ public void notifyOnFail(RpcCallback<IOException> failureCb) {
+ this.failureCb.set(failureCb);
+ if (this.exception != null) {
+ failureCb.run(this.exception);
+ }
+ }
+
+ @Override
+ public void setFailed(String reason) {
+ this.exception = new IOException(reason);
+ if (this.failureCb.get() != null) {
+ this.failureCb.get().run(this.exception);
+ }
+ }
+
+ /**
+ * Set failed with an exception to pass on.
+ * For use in async rpc clients
+ *
+ * @param e exception to set with
+ */
+ public void setFailed(IOException e) {
+ this.exception = e;
+ if (this.failureCb.get() != null) {
+ this.failureCb.get().run(this.exception);
+ }
+ }
+
+ @Override
+ public void startCancel() {
+ cancelled = true;
+ if (cancellationCb.get() != null) {
+ cancellationCb.get().run(null);
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 55d6375..209deed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -36,11 +36,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
+ * Provides clients with an RPC connection to call Coprocessor Endpoint
+ * {@link com.google.protobuf.Service}s
* against a given table region. An instance of this class may be obtained
* by calling {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])},
- * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
- * methods.
+ * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to
+ * call the endpoint methods.
* @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])
*/
@InterfaceAudience.Private
@@ -76,30 +77,21 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
throws IOException {
if (LOG.isTraceEnabled()) {
- LOG.trace("Call: "+method.getName()+", "+request.toString());
+ LOG.trace("Call: " + method.getName() + ", " + request.toString());
}
-
if (row == null) {
throw new IllegalArgumentException("Missing row property for remote region location");
}
-
- final RpcController rpcController = controller == null
- ? rpcControllerFactory.newController() : controller;
-
final ClientProtos.CoprocessorServiceCall call =
CoprocessorRpcUtils.buildServiceCall(row, method, request);
RegionServerCallable<CoprocessorServiceResponse> callable =
- new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+ new RegionServerCallable<CoprocessorServiceResponse>(connection,
+ controller == null? this.rpcControllerFactory.newController(): controller,
+ table, row) {
@Override
- public CoprocessorServiceResponse call(int callTimeout) throws Exception {
- if (rpcController instanceof PayloadCarryingRpcController) {
- ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
- }
- if (rpcController instanceof TimeLimitedRpcController) {
- ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
- }
+ protected CoprocessorServiceResponse rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
- return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
+ return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName);
}
};
CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
@@ -126,4 +118,4 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
public byte[] getLastRegion() {
return lastRegion;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
index faeca8d..4b84df1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
@@ -47,6 +47,7 @@ public class RpcControllerFactory {
}
public PayloadCarryingRpcController newController() {
+ // TODO: Set HConstants default rpc timeout here rather than nothing?
return new PayloadCarryingRpcController();
}
@@ -80,4 +81,4 @@ public class RpcControllerFactory {
return new RpcControllerFactory(configuration);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
deleted file mode 100644
index cf08ea9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-public class TimeLimitedRpcController implements RpcController {
-
- /**
- * The time, in ms before the call should expire.
- */
- protected volatile Integer callTimeout;
- protected volatile boolean cancelled = false;
- protected final AtomicReference<RpcCallback<Object>> cancellationCb =
- new AtomicReference<>(null);
-
- protected final AtomicReference<RpcCallback<IOException>> failureCb =
- new AtomicReference<>(null);
-
- private IOException exception;
-
- public int getCallTimeout() {
- if (callTimeout != null) {
- return callTimeout;
- } else {
- return 0;
- }
- }
-
- public void setCallTimeout(int callTimeout) {
- this.callTimeout = callTimeout;
- }
-
- public boolean hasCallTimeout(){
- return callTimeout != null;
- }
-
- @Override
- public String errorText() {
- if (exception != null) {
- return exception.getMessage();
- } else {
- return null;
- }
- }
-
- /**
- * For use in async rpc clients
- * @return true if failed
- */
- @Override
- public boolean failed() {
- return this.exception != null;
- }
-
- @Override
- public boolean isCanceled() {
- return cancelled;
- }
-
- @Override
- public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
- this.cancellationCb.set(cancellationCb);
- if (this.cancelled) {
- cancellationCb.run(null);
- }
- }
-
- /**
- * Notify a callback on error.
- * For use in async rpc clients
- *
- * @param failureCb the callback to call on error
- */
- public void notifyOnFail(RpcCallback<IOException> failureCb) {
- this.failureCb.set(failureCb);
- if (this.exception != null) {
- failureCb.run(this.exception);
- }
- }
-
- @Override
- public void reset() {
- exception = null;
- cancelled = false;
- failureCb.set(null);
- cancellationCb.set(null);
- callTimeout = null;
- }
-
- @Override
- public void setFailed(String reason) {
- this.exception = new IOException(reason);
- if (this.failureCb.get() != null) {
- this.failureCb.get().run(this.exception);
- }
- }
-
- /**
- * Set failed with an exception to pass on.
- * For use in async rpc clients
- *
- * @param e exception to set with
- */
- public void setFailed(IOException e) {
- this.exception = e;
- if (this.failureCb.get() != null) {
- this.failureCb.get().run(this.exception);
- }
- }
-
- @Override
- public void startCancel() {
- cancelled = true;
- if (cancellationCb.get() != null) {
- cancellationCb.get().run(null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5ba0572..623acd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.protobuf;
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
+.RegionSpecifierType.REGION_NAME;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -38,14 +41,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
-.RegionSpecifierType.REGION_NAME;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -124,8 +125,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -171,11 +172,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.ArrayListMultimap;
@@ -334,17 +333,32 @@ public final class ProtobufUtil {
* a new IOException that wraps the unexpected ServiceException.
*/
public static IOException getRemoteException(ServiceException se) {
- Throwable e = se.getCause();
- if (e == null) {
- return new IOException(se);
+ return makeIOExceptionOfException(se);
+ }
+
+ /**
+ * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
+ * just {@link ServiceException}. Prefer this method to
+ * {@link #getRemoteException(ServiceException)} because trying to
+ * contain direct protobuf references.
+ * @param e
+ */
+ public static IOException handleRemoteException(Exception e) {
+ return makeIOExceptionOfException(e);
+ }
+
+ private static IOException makeIOExceptionOfException(Exception e) {
+ Throwable t = e;
+ if (e instanceof ServiceException) {
+ t = e.getCause();
}
- if (ExceptionUtil.isInterrupt(e)) {
- return ExceptionUtil.asInterrupt(e);
+ if (ExceptionUtil.isInterrupt(t)) {
+ return ExceptionUtil.asInterrupt(t);
}
- if (e instanceof RemoteException) {
- e = ((RemoteException) e).unwrapRemoteException();
+ if (t instanceof RemoteException) {
+ t = ((RemoteException)t).unwrapRemoteException();
}
- return e instanceof IOException ? (IOException) e : new IOException(se);
+ return t instanceof IOException? (IOException)t: new HBaseIOException(t);
}
/**
@@ -1252,7 +1266,6 @@ public final class ProtobufUtil {
return toMutation(type, mutation, builder, HConstants.NO_NONCE);
}
- @SuppressWarnings("deprecation")
public static MutationProto toMutation(final MutationType type, final Mutation mutation,
MutationProto.Builder builder, long nonce)
throws IOException {
@@ -2658,13 +2671,11 @@ public final class ProtobufUtil {
}
}
- @SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
}
- @SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
// compaction descriptor contains relative paths.
@@ -3663,4 +3674,28 @@ public final class ProtobufUtil {
return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
stats.getCompactionPressure());
}
-}
+
+ /**
+ * @param msg
+ * @return A String version of the passed in <code>msg</code>
+ */
+ public static String toText(Message msg) {
+ return TextFormat.shortDebugString(msg);
+ }
+
+ public static byte [] toBytes(ByteString bs) {
+ return bs.toByteArray();
+ }
+
+ /**
+ * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
+ * @throws IOException
+ */
+ public static <T> T call(Callable<T> callable) throws IOException {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 0aa9704..5959078 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -190,7 +190,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
- PayloadCarryingServerCallable callable) {
+ CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
@@ -253,7 +253,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
- PayloadCarryingServerCallable callable) {
+ CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@@ -290,7 +290,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
- PayloadCarryingServerCallable payloadCallable) {
+ CancellableRegionServerCallable payloadCallable) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index f083001..fd2a393 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -65,7 +65,6 @@ public class TestClientScanner {
RpcControllerFactory controllerFactory;
@Before
- @SuppressWarnings("deprecation")
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
index 9c3367e..edcbdc5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
@@ -45,4 +45,5 @@ public class HBaseIOException extends IOException {
public HBaseIOException(Throwable cause) {
super(cause);
- }}
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 688b51a..7e6c5d6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -73,4 +73,4 @@ public class ExceptionUtil {
private ExceptionUtil() {
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 5b2aab1..4b27924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 09dedec..a34dc0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
import static java.lang.String.format;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -87,12 +82,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -100,9 +95,13 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Tool to load the output of HFileOutputFormat into an existing table.
- * @see #usage()
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -130,11 +129,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private String bulkToken;
private UserProvider userProvider;
private int nrThreads;
+ private RpcControllerFactory rpcControllerFactory;
private LoadIncrementalHFiles() {}
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
+ this.rpcControllerFactory = new RpcControllerFactory(conf);
initialize();
}
@@ -322,7 +323,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
- SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table);
+ SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
try {
/*
@@ -473,9 +474,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Used by the replication sink to load the hfiles from the source cluster. It does the following,
- * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
- * {@link
- * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+ * <ol>
+ * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+ * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+ * </li>
+ * </ol>
* @param table Table to which these hfiles should be loaded to
* @param conn Connection to use
* @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@@ -776,27 +779,23 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
- final List<Pair<byte[], String>> famPaths =
- new ArrayList<>(lqis.size());
+ final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
-
- final RegionServerCallable<Boolean> svrCallable =
- new RegionServerCallable<Boolean>(conn, tableName, first) {
+ final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
+ rpcControllerFactory, tableName, first) {
@Override
- public Boolean call(int callTimeout) throws Exception {
+ protected Boolean rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;
boolean success = false;
-
try {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(table);
- success =
- secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ secureClient = new SecureBulkLoadClient(getConf(), table);
+ success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
}
return success;
@@ -1078,7 +1077,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
- * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+ * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
* property. This directory is used as a temporary directory where all files are initially
* copied/moved from user given directory, set all the required file permissions and then from
* their it is finally loaded into a table. This should be set only when, one would like to manage
@@ -1088,5 +1087,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void setBulkToken(String stagingDir) {
this.bulkToken = stagingDir;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index a21edcc..3261bd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
-import com.google.protobuf.ServiceException;
-
/**
* The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
* mob files.
@@ -86,10 +84,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
} catch (LockTimeoutException e) {
LOG.info("Fail to acquire the lock because of timeout, maybe a"
+ " MobCompactor is running", e);
- } catch (ServiceException e) {
- LOG.error(
- "Fail to clean the expired mob files for the column " + hcd.getNameAsString()
- + " in the table " + htd.getNameAsString(), e);
} catch (IOException e) {
LOG.error(
"Fail to clean the expired mob files for the column " + hcd.getNameAsString()
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index ad1a3ca..326aa00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -18,14 +18,6 @@
*/
package org.apache.hadoop.hbase.master;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -92,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessController;
import org.apache.hadoop.hbase.security.visibility.VisibilityController;
@@ -103,6 +94,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
/**
* Implements the master RPC services.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 531883a..d7ba4f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -454,8 +454,7 @@ public class ServerManager {
/**
* Adds the onlineServers list. onlineServers should be locked.
* @param serverName The remote servers name.
- * @param sl
- * @return Server load from the removed server, if any.
+ * @param s
*/
@VisibleForTesting
void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
index 1499788..96ea036 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -188,7 +188,6 @@ public class TableStateManager {
return MetaTableAccessor.getTableState(master.getConnection(), tableName);
}
- @SuppressWarnings("deprecation")
public void start() throws IOException {
TableDescriptors tableDescriptors = master.getTableDescriptors();
Connection connection = master.getConnection();
@@ -220,4 +219,4 @@ public class TableStateManager {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index 3c965cb..d4a54bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.google.protobuf.ServiceException;
-
/**
* The cleaner to delete the expired MOB files.
*/
@@ -60,11 +58,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
* directory.
* @param tableName The current table name.
* @param family The current family.
- * @throws ServiceException
- * @throws IOException
*/
- public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
- throws ServiceException, IOException {
+ public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
Configuration conf = getConf();
TableName tn = TableName.valueOf(tableName);
FileSystem fs = FileSystem.get(conf);
@@ -99,7 +94,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
String tableName = args[0];
String familyName = args[1];
TableName tn = TableName.valueOf(tableName);
- HBaseAdmin.checkHBaseAvailable(getConf());
+ HBaseAdmin.available(getConf());
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {
@@ -127,5 +122,4 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
}
}
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index 8547c8c..c27e8ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
-import com.google.protobuf.ServiceException;
-
/**
* The sweep tool. It deletes the mob files that are not used and merges the small mob files to
* bigger ones. Each run of this sweep tool only handles one column family. The runs on
@@ -64,10 +62,10 @@ public class Sweeper extends Configured implements Tool {
* @throws ServiceException
*/
int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
- ClassNotFoundException, KeeperException, ServiceException {
+ ClassNotFoundException, KeeperException {
Configuration conf = getConf();
// make sure the target HBase exists.
- HBaseAdmin.checkHBaseAvailable(conf);
+ HBaseAdmin.available(conf);
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0b4ae75..89bfbf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -18,17 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler;
@@ -106,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.http.HttpServer;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -198,6 +186,17 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
import sun.misc.Signal;
import sun.misc.SignalHandler;
@@ -206,7 +205,7 @@ import sun.misc.SignalHandler;
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@SuppressWarnings("deprecation")
+@SuppressWarnings({ "deprecation", "restriction" })
public class HRegionServer extends HasThread implements
RegionServerServices, LastSequenceId, ConfigurationObserver {
@@ -818,9 +817,8 @@ public class HRegionServer extends HasThread implements
// when ready.
blockAndCheckIfStopped(this.clusterStatusTracker);
- if (this.initLatch != null) {
- this.initLatch.await(20, TimeUnit.SECONDS);
- }
+ doLatch(this.initLatch);
+
// Retrieve clusterId
// Since cluster status is now up
// ID should have already been set by HMaster
@@ -855,6 +853,16 @@ public class HRegionServer extends HasThread implements
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
+ justification="We don't care about the return")
+ private void doLatch(final CountDownLatch latch) throws InterruptedException {
+ if (latch != null) {
+ // Result is ignored intentionally but if I remove the below, findbugs complains (the
+ // above justification on this method doesn't seem to suppress it).
+ boolean result = latch.await(20, TimeUnit.SECONDS);
+ }
+ }
+
/**
* Utilty method to wait indefinitely on a znode availability while checking
* if the region server is shut down
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 681b1dc..3859d18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -1381,8 +1380,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
family = request.getFamily().toByteArray();
store = region.getStore(family);
if (store == null) {
- throw new ServiceException(new IOException("column family " + Bytes.toString(family)
- + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
+ throw new ServiceException(new DoNotRetryIOException("column family " +
+ Bytes.toString(family) + " does not exist in region " +
+ region.getRegionInfo().getRegionNameAsString()));
}
}
if (request.hasMajor()) {
@@ -2767,12 +2767,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
- if (controller instanceof TimeLimitedRpcController) {
- TimeLimitedRpcController timeLimitedRpcController =
- (TimeLimitedRpcController)controller;
- if (timeLimitedRpcController.getCallTimeout() > 0) {
- timeLimitDelta = Math.min(timeLimitDelta,
- timeLimitedRpcController.getCallTimeout());
+ if (controller != null) {
+ if (controller instanceof PayloadCarryingRpcController) {
+ PayloadCarryingRpcController pRpcController =
+ (PayloadCarryingRpcController)controller;
+ if (pRpcController.getCallTimeout() > 0) {
+ timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
+ }
+ } else {
+ throw new UnsupportedOperationException("We only do " +
+ "PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller);
}
}
// Use half of whichever timeout value was more restrictive... But don't allow
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 3eb85bd..004581d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -42,9 +40,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -61,10 +57,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.Private
public class WALEditsReplaySink {
-
private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
private static final int MAX_BATCH_SIZE = 1024;
-
private final Configuration conf;
private final ClusterConnection conn;
private final TableName tableName;
@@ -166,8 +160,8 @@ public class WALEditsReplaySink {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
ReplayServerCallable<ReplicateWALEntryResponse> callable =
- new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
- regionInfo, entries);
+ new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
+ this.tableName, regionLoc, entries);
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
@@ -184,31 +178,18 @@ public class WALEditsReplaySink {
* @param <R>
*/
class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
- private HRegionInfo regionInfo;
private List<Entry> entries;
- ReplayServerCallable(final Connection connection, final TableName tableName,
- final HRegionLocation regionLoc, final HRegionInfo regionInfo,
- final List<Entry> entries) {
- super(connection, tableName, null);
+ ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
+ final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
+ super(connection, rpcControllerFactory, tableName, null);
this.entries = entries;
- this.regionInfo = regionInfo;
setLocation(regionLoc);
}
@Override
- public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
- try {
- replayToServer(this.regionInfo, this.entries);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- return null;
- }
-
- private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
- throws IOException, ServiceException {
- if (entries.isEmpty()) return;
+ protected ReplicateWALEntryResponse rpcCall() throws Exception {
+ if (entries.isEmpty()) return null;
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
@@ -216,12 +197,8 @@ public class WALEditsReplaySink {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
- try {
- remoteSvr.replay(controller, p.getFirst());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ setRpcControllerCellScanner(p.getSecond());
+ return remoteSvr.replay(getRpcController(), p.getFirst());
}
@Override
@@ -245,4 +222,4 @@ public class WALEditsReplaySink {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b0fd176..c756294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
@@ -46,27 +45,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -74,12 +67,17 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.util.StringUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@@ -611,9 +609,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
* Calls replay on the passed edits for the given set of entries belonging to the region. It skips
* the entry if the region boundaries have changed or the region is gone.
*/
- static class RegionReplicaReplayCallable
- extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-
+ static class RegionReplicaReplayCallable extends
+ RegionAdminServiceCallable<ReplicateWALEntryResponse> {
private final List<Entry> entries;
private final byte[] initialEncodedRegionName;
private final AtomicLong skippedEntries;
@@ -628,38 +625,25 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
}
- @Override
- public ReplicateWALEntryResponse call(int timeout) throws IOException {
- return replayToServer(this.entries, timeout);
- }
-
- private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
- throws IOException {
- // check whether we should still replay this entry. If the regions are changed, or the
+ public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
+ // Check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
boolean skip = false;
-
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
- initialEncodedRegionName)) {
+ initialEncodedRegionName)) {
skip = true;
}
- if (!entries.isEmpty() && !skip) {
- Entry[] entriesArray = new Entry[entries.size()];
- entriesArray = entries.toArray(entriesArray);
+ if (!this.entries.isEmpty() && !skip) {
+ Entry[] entriesArray = new Entry[this.entries.size()];
+ entriesArray = this.entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
- try {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
- controller.setCallTimeout(timeout);
- controller.setPriority(tableName);
- return stub.replay(controller, p.getFirst());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ controller.setCellScanner(p.getSecond());
+ return stub.replay(controller, p.getFirst());
}
if (skip) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 2e7cf7f..bbf858d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -1145,8 +1145,11 @@ public final class Canary implements Tool {
}
List<RegionTask> tasks = new ArrayList<RegionTask>();
try {
- for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
- tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+ List<HRegionInfo> hris = admin.getTableRegions(tableDesc.getTableName());
+ if (hris != null) {
+ for (HRegionInfo region : hris) {
+ tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+ }
}
} finally {
table.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index d708edc..3c81cfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -23,19 +23,18 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -80,13 +79,11 @@ public class Merge extends Configured implements Tool {
// Verify HBase is down
LOG.info("Verifying that HBase is not running...");
try {
- HBaseAdmin.checkHBaseAvailable(getConf());
+ HBaseAdmin.available(getConf());
LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
return -1;
} catch (ZooKeeperConnectionException zkce) {
// If no zk, presume no master.
- } catch (MasterNotRunningException e) {
- // Expected. Ignore.
}
// Initialize MetaUtils and and get the root of the HBase installation
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index d778fa9..2dca6b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -60,7 +60,6 @@ public class TestNamespace {
private static ZKNamespaceManager zkNamespaceManager;
private String prefix = "TestNamespace";
-
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
@@ -301,7 +300,8 @@ public class TestNamespace {
runWithExpectedException(new Callable<Void>() {
@Override
public Void call() throws Exception {
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
+ HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
htd.addFamily(new HColumnDescriptor("family1"));
admin.createTable(htd);
return null;
@@ -387,5 +387,4 @@ public class TestNamespace {
}
fail("Should have thrown exception " + exceptionClass);
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index d088fc4..3203636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -67,8 +65,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.protobuf.ServiceException;
-
/**
* Class to test HBaseAdmin.
@@ -335,7 +331,8 @@ public class TestAdmin2 {
@Test (timeout=300000)
public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
- byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion1");
+ final String name = "TestHBACloseRegion1";
+ byte[] TABLENAME = Bytes.toBytes(name);
createTableWithDefaultConf(TABLENAME);
HRegionInfo info = null;
@@ -343,7 +340,7 @@ public class TestAdmin2 {
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
- if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion1")) {
+ if (regionInfo.getRegionNameAsString().contains(name)) {
info = regionInfo;
try {
admin.closeRegionWithEncodedRegionName("sample", rs.getServerName()
@@ -643,11 +640,9 @@ public class TestAdmin2 {
long start = System.currentTimeMillis();
try {
- HBaseAdmin.checkHBaseAvailable(conf);
+ HBaseAdmin.available(conf);
assertTrue(false);
- } catch (MasterNotRunningException ignored) {
} catch (ZooKeeperConnectionException ignored) {
- } catch (ServiceException ignored) {
} catch (IOException ignored) {
}
long end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 679d9c9..f49c558 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -28,13 +28,10 @@ import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@@ -56,7 +53,6 @@ import com.google.protobuf.ServiceException;
@Category({MediumTests.class, ClientTests.class})
public class TestClientTimeouts {
- private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static int SLAVES = 1;
@@ -87,7 +83,6 @@ public class TestClientTimeouts {
*/
@Test
public void testAdminTimeout() throws Exception {
- Connection lastConnection = null;
boolean lastFailed = false;
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -105,7 +100,7 @@ public class TestClientTimeouts {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
// run some admin commands
- HBaseAdmin.checkHBaseAvailable(conf);
+ HBaseAdmin.available(conf);
admin.setBalancerRunning(false, false);
} catch (ZooKeeperConnectionException ex) {
// Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index bfd16a7..bda80de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.HMaster;
@@ -84,6 +85,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Lists;
+import com.google.protobuf.RpcController;
/**
* This class is for testing HBaseConnectionManager features
@@ -104,8 +107,6 @@ public class TestHCM {
TableName.valueOf("test2");
private static final TableName TABLE_NAME3 =
TableName.valueOf("test3");
- private static final TableName TABLE_NAME4 =
- TableName.valueOf("test4");
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx");
@@ -525,10 +526,12 @@ public class TestHCM {
long pauseTime;
long baseTime = 100;
TableName tableName = TableName.valueOf("HCM-testCallableSleep");
- Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
+ TEST_UTIL.createTable(tableName, FAM_NAM);
RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
- TEST_UTIL.getConnection(), tableName, ROW) {
- public Object call(int timeout) throws IOException {
+ TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
+ tableName, ROW) {
+ @Override
+ protected Object rpcCall() throws Exception {
return null;
}
};
@@ -542,9 +545,10 @@ public class TestHCM {
RegionAdminServiceCallable<Object> regionAdminServiceCallable =
new RegionAdminServiceCallable<Object>(
- (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
- TEST_UTIL.getConfiguration()), tableName, ROW) {
- public Object call(int timeout) throws IOException {
+ (ClusterConnection) TEST_UTIL.getConnection(),
+ new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
+ @Override
+ public Object call(PayloadCarryingRpcController controller) throws Exception {
return null;
}
};
@@ -556,16 +560,21 @@ public class TestHCM {
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
}
- MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
- public Object call(int timeout) throws IOException {
+ MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
+ new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
+ @Override
+ protected Object rpcCall() throws Exception {
return null;
}
};
-
- for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
- pauseTime = masterCallable.sleep(baseTime, i);
- assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
- assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+ try {
+ for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
+ pauseTime = masterCallable.sleep(baseTime, i);
+ assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
+ assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+ }
+ } finally {
+ masterCallable.close();
}
}
@@ -1267,7 +1276,6 @@ public class TestHCM {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
try {
- long timeBase = timeMachine.currentTime();
long largeAmountOfTime = ANY_PAUSE * 1000;
ConnectionImplementation.ServerErrorTracker tracker =
new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 354f0a8..9b4e9f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -20,6 +20,15 @@
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +44,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -49,15 +59,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
@Category({MediumTests.class, ClientTests.class})
public class TestReplicaWithCluster {
private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
@@ -332,26 +333,27 @@ public class TestReplicaWithCluster {
// bulk load HFiles
LOG.debug("Loading test data");
- @SuppressWarnings("deprecation")
final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
table = conn.getTable(hdt.getTableName());
- final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
- RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
- conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
- @Override
- public Void call(int timeout) throws Exception {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
+ final String bulkToken =
+ new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
+ RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+ new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
+ TestHRegionServerBulkLoad.rowkey(0)) {
+ @Override
+ protected Void rpcCall() throws Exception {
+ LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
- SecureBulkLoadClient secureClient = null;
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(table);
- secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- true, null, bulkToken);
- }
- return null;
+ SecureBulkLoadClient secureClient = null;
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
+ secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ true, null, bulkToken);
}
- };
+ return null;
+ }
+ };
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller();
caller.callWithRetries(callable, 10000);
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index b3cbd33..ffe3e82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -573,11 +573,11 @@ public class TestReplicasClient {
Assert.assertTrue(((Result)r).isStale());
Assert.assertTrue(((Result)r).getExists());
}
- Set<PayloadCarryingServerCallable> set =
+ Set<CancellableRegionServerCallable> set =
((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
// verify we did cancel unneeded calls
Assert.assertTrue(!set.isEmpty());
- for (PayloadCarryingServerCallable m : set) {
+ for (CancellableRegionServerCallable m : set) {
Assert.assertTrue(m.isCancelled());
}
} finally {