You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/23 20:12:18 UTC
svn commit: r1329358 [1/5] - in /hbase/trunk:
security/src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/...
Author: stack
Date: Mon Apr 23 18:12:16 2012
New Revision: 1329358
URL: http://svn.apache.org/viewvc?rev=1329358&view=rev
Log:
HBASE-5443 Convert admin protocol of HRegionInterface to PB
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
Modified:
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/SortedCopyOnWriteSet.java
hbase/trunk/src/main/protobuf/Admin.proto
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
Modified: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (original)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java Mon Apr 23 18:12:16 2012
@@ -212,11 +212,25 @@ public class SecureRpcEngine implements
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion != clientVersion) {
- throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
+ try {
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
+ if (serverVersion != clientVersion) {
+ throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ } catch (Throwable t) {
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ if (t instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)t);
+ }
+ if (!(t instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", t);
+ throw new IOException(t);
+ }
+ throw (IOException)t;
}
return proxy;
}
Modified: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java (original)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java Mon Apr 23 18:12:16 2012
@@ -85,7 +85,7 @@ public abstract class SecureServer exten
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
public static final byte CURRENT_VERSION = 4;
- public static final Set<Byte> INSECURE_VERSIONS = ImmutableSet.of((byte) 3);
+ public static final Set<Byte> INSECURE_VERSIONS = ImmutableSet.of((byte) 5);
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.SecureServer");
private static final Log AUDITLOG =
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java Mon Apr 23 18:12:16 2012
@@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.Abortable
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@@ -340,7 +342,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Use #getRootServerConnection(long)
*/
- public HRegionInterface waitForRootServerConnection(long timeout)
+ public AdminProtocol waitForRootServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getRootServerConnection(timeout);
}
@@ -356,7 +358,7 @@ public class CatalogTracker {
* @throws NotAllMetaRegionsOnlineException if timed out waiting
* @throws IOException
*/
- HRegionInterface getRootServerConnection(long timeout)
+ AdminProtocol getRootServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getCachedConnection(waitForRoot(timeout));
}
@@ -370,7 +372,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Use #getRootServerConnection(long)
*/
- public HRegionInterface waitForRootServerConnectionDefault()
+ public AdminProtocol waitForRootServerConnectionDefault()
throws NotAllMetaRegionsOnlineException, IOException {
try {
return getRootServerConnection(this.defaultTimeout);
@@ -395,11 +397,11 @@ public class CatalogTracker {
* @throws IOException
* @throws InterruptedException
*/
- private HRegionInterface getMetaServerConnection()
+ private AdminProtocol getMetaServerConnection()
throws IOException, InterruptedException {
synchronized (metaAvailable) {
if (metaAvailable.get()) {
- HRegionInterface current = getCachedConnection(this.metaLocation);
+ AdminProtocol current = getCachedConnection(this.metaLocation);
// If we are to refresh, verify we have a good connection by making
// an invocation on it.
if (verifyRegionLocation(current, this.metaLocation, META_REGION_NAME)) {
@@ -416,7 +418,7 @@ public class CatalogTracker {
ServerName newLocation = MetaReader.getMetaRegionLocation(this);
if (newLocation == null) return null;
- HRegionInterface newConnection = getCachedConnection(newLocation);
+ AdminProtocol newConnection = getCachedConnection(newLocation);
if (verifyRegionLocation(newConnection, newLocation, META_REGION_NAME)) {
setMetaLocation(newLocation);
return newConnection;
@@ -495,7 +497,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Does not retry; use an HTable instance instead.
*/
- public HRegionInterface waitForMetaServerConnection(long timeout)
+ public AdminProtocol waitForMetaServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getCachedConnection(waitForMeta(timeout));
}
@@ -510,7 +512,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Does not retry; use an HTable instance instead.
*/
- public HRegionInterface waitForMetaServerConnectionDefault()
+ public AdminProtocol waitForMetaServerConnectionDefault()
throws NotAllMetaRegionsOnlineException, IOException {
try {
return getCachedConnection(waitForMeta(defaultTimeout));
@@ -546,19 +548,19 @@ public class CatalogTracker {
/**
* @param sn ServerName to get a connection against.
- * @return The HRegionInterface we got when we connected to <code>sn</code>
+ * @return The AdminProtocol we got when we connected to <code>sn</code>
* May have come from cache, may not be good, may have been setup by this
* invocation, or may be null.
* @throws IOException
*/
- private HRegionInterface getCachedConnection(ServerName sn)
+ private AdminProtocol getCachedConnection(ServerName sn)
throws IOException {
if (sn == null) {
return null;
}
- HRegionInterface protocol = null;
+ AdminProtocol protocol = null;
try {
- protocol = connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ protocol = connection.getAdmin(sn.getHostname(), sn.getPort());
} catch (RetriesExhaustedException e) {
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
// Catch this; presume it means the cached connection has gone bad.
@@ -599,11 +601,11 @@ public class CatalogTracker {
* the Interface.
* @throws IOException
*/
- // TODO: We should be able to get the ServerName from the HRegionInterface
+ // TODO: We should be able to get the ServerName from the AdminProtocol
// rather than have to pass it in. Its made awkward by the fact that the
// HRI is likely a proxy against remote server so the getServerName needs
// to be fixed to go to a local method or to a cache before we can do this.
- private boolean verifyRegionLocation(HRegionInterface hostingServer,
+ private boolean verifyRegionLocation(AdminProtocol hostingServer,
final ServerName address, final byte [] regionName)
throws IOException {
if (hostingServer == null) {
@@ -613,7 +615,7 @@ public class CatalogTracker {
Throwable t = null;
try {
// Try and get regioninfo from the hosting server.
- return hostingServer.getRegionInfo(regionName) != null;
+ return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null;
} catch (ConnectException e) {
t = e;
} catch (RetriesExhaustedException e) {
@@ -647,7 +649,7 @@ public class CatalogTracker {
*/
public boolean verifyRootRegionLocation(final long timeout)
throws InterruptedException, IOException {
- HRegionInterface connection = null;
+ AdminProtocol connection = null;
try {
connection = waitForRootServerConnection(timeout);
} catch (NotAllMetaRegionsOnlineException e) {
@@ -672,7 +674,7 @@ public class CatalogTracker {
*/
public boolean verifyMetaRegionLocation(final long timeout)
throws InterruptedException, IOException {
- HRegionInterface connection = null;
+ AdminProtocol connection = null;
try {
connection = waitForMetaServerConnection(timeout);
} catch (NotAllMetaRegionsOnlineException e) {
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java?rev=1329358&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java Mon Apr 23 18:12:16 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Private
+public interface AdminProtocol extends
+ AdminService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java?rev=1329358&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java Mon Apr 23 18:12:16 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ClientProtocol extends
+ ClientService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Mon Apr 23 18:12:16 2012
@@ -53,13 +53,21 @@ import org.apache.hadoop.hbase.UnknownRe
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -71,6 +79,7 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
/**
@@ -1092,20 +1101,26 @@ public class HBaseAdmin implements Abort
*/
public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
final String serverName) throws IOException {
- byte[] encodedRegionNameInBytes = Bytes.toBytes(encodedRegionName);
if (null == serverName || ("").equals(serverName.trim())) {
throw new IllegalArgumentException(
"The servername cannot be null or empty.");
}
ServerName sn = new ServerName(serverName);
- HRegionInterface rs = this.connection.getHRegionConnection(
+ AdminProtocol admin = this.connection.getAdmin(
sn.getHostname(), sn.getPort());
// Close the region without updating zk state.
- boolean isRegionClosed = rs.closeRegion(encodedRegionNameInBytes, false);
- if (false == isRegionClosed) {
- LOG.error("Not able to close the region " + encodedRegionName + ".");
+ CloseRegionRequest request =
+ RequestConverter.buildCloseRegionRequest(encodedRegionName, false);
+ try {
+ CloseRegionResponse response = admin.closeRegion(null, request);
+ boolean isRegionClosed = response.getClosed();
+ if (false == isRegionClosed) {
+ LOG.error("Not able to close the region " + encodedRegionName + ".");
+ }
+ return isRegionClosed;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
}
- return isRegionClosed;
}
/**
@@ -1117,10 +1132,10 @@ public class HBaseAdmin implements Abort
*/
public void closeRegion(final ServerName sn, final HRegionInfo hri)
throws IOException {
- HRegionInterface rs =
- this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ AdminProtocol admin =
+ this.connection.getAdmin(sn.getHostname(), sn.getPort());
// Close the region without updating zk state.
- rs.closeRegion(hri, false);
+ ProtobufUtil.closeRegion(admin, hri.getRegionName(), false);
}
/**
@@ -1183,9 +1198,15 @@ public class HBaseAdmin implements Abort
private void flush(final ServerName sn, final HRegionInfo hri)
throws IOException {
- HRegionInterface rs =
- this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
- rs.flushRegion(hri);
+ AdminProtocol admin =
+ this.connection.getAdmin(sn.getHostname(), sn.getPort());
+ FlushRegionRequest request =
+ RequestConverter.buildFlushRegionRequest(hri.getRegionName());
+ try {
+ admin.flushRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
/**
@@ -1289,9 +1310,15 @@ public class HBaseAdmin implements Abort
private void compact(final ServerName sn, final HRegionInfo hri,
final boolean major)
throws IOException {
- HRegionInterface rs =
- this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
- rs.compactRegion(hri, major);
+ AdminProtocol admin =
+ this.connection.getAdmin(sn.getHostname(), sn.getPort());
+ CompactRegionRequest request =
+ RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major);
+ try {
+ admin.compactRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
/**
@@ -1471,9 +1498,15 @@ public class HBaseAdmin implements Abort
private void split(final ServerName sn, final HRegionInfo hri,
byte[] splitPoint) throws IOException {
- HRegionInterface rs =
- this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
- rs.splitRegion(hri, splitPoint);
+ AdminProtocol admin =
+ this.connection.getAdmin(sn.getHostname(), sn.getPort());
+ SplitRegionRequest request =
+ RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
+ try {
+ admin.splitRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
/**
@@ -1572,9 +1605,15 @@ public class HBaseAdmin implements Abort
throws IOException {
String hostname = Addressing.parseHostname(hostnamePort);
int port = Addressing.parsePort(hostnamePort);
- HRegionInterface rs =
- this.connection.getHRegionConnection(hostname, port);
- rs.stop("Called by admin client " + this.connection.toString());
+ AdminProtocol admin =
+ this.connection.getAdmin(hostname, port);
+ StopServerRequest request = RequestConverter.buildStopServerRequest(
+ "Called by admin client " + this.connection.toString());
+ try {
+ admin.stopServer(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
/**
@@ -1715,9 +1754,21 @@ public class HBaseAdmin implements Abort
public synchronized byte[][] rollHLogWriter(String serverName)
throws IOException, FailedLogCloseException {
ServerName sn = new ServerName(serverName);
- HRegionInterface rs = this.connection.getHRegionConnection(
+ AdminProtocol admin = this.connection.getAdmin(
sn.getHostname(), sn.getPort());
- return rs.rollHLogWriter();
+ RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();;
+ try {
+ RollWALWriterResponse response = admin.rollWALWriter(null, request);
+ int regionCount = response.getRegionToFlushCount();
+ byte[][] regionsToFlush = new byte[regionCount][];
+ for (int i = 0; i < regionCount; i++) {
+ ByteString region = response.getRegionToFlush(i);
+ regionsToFlush[i] = region.toByteArray();
+ }
+ return regionsToFlush;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
public String[] getMasterCoprocessors() {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Mon Apr 23 18:12:16 2012
@@ -36,11 +36,11 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -201,24 +201,13 @@ public interface HConnection extends Abo
/**
* Establishes a connection to the region server at the specified address.
- * @param regionServer - the server to connect to
- * @return proxy for HRegionServer
- * @throws IOException if a remote or network exception occurs
- * @deprecated Use {@link #getHRegionConnection(String, int)}
- */
- @Deprecated
- public HRegionInterface getHRegionConnection(HServerAddress regionServer)
- throws IOException;
-
- /**
- * Establishes a connection to the region server at the specified address.
* @param hostname RegionServer hostname
* @param port RegionServer port
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*
*/
- public HRegionInterface getHRegionConnection(final String hostname, final int port)
+ public AdminProtocol getAdmin(final String hostname, final int port)
throws IOException;
/**
@@ -236,26 +225,13 @@ public interface HConnection extends Abo
/**
* Establishes a connection to the region server at the specified address.
- * @param regionServer - the server to connect to
- * @param getMaster - do we check if master is alive
- * @return proxy for HRegionServer
- * @throws IOException if a remote or network exception occurs
- * @deprecated Use {@link #getHRegionConnection(String, int)}
- */
- @Deprecated
- public HRegionInterface getHRegionConnection(HServerAddress regionServer,
- boolean getMaster)
- throws IOException;
-
- /**
- * Establishes a connection to the region server at the specified address.
* @param hostname RegionServer hostname
* @param port RegionServer port
* @param getMaster - do we check if master is alive
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
- public HRegionInterface getHRegionConnection(final String hostname,
+ public AdminProtocol getAdmin(final String hostname,
final int port, boolean getMaster)
throws IOException;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Apr 23 18:12:16 2012
@@ -66,20 +66,16 @@ import org.apache.hadoop.hbase.ServerNam
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ClientProtocol;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -161,6 +157,12 @@ public class HConnectionManager {
/** Default client protocol class name. */
public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
+ /** Parameter name for what admin protocol to use. */
+ public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
+
+ /** Default admin protocol class name. */
+ public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
static {
@@ -507,7 +509,7 @@ public class HConnectionManager {
/* Encapsulates connection to zookeeper and regionservers.*/
static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
- private final Class<? extends HRegionInterface> serverInterfaceClass;
+ private final Class<? extends AdminProtocol> adminClass;
private final Class<? extends ClientProtocol> clientClass;
private final long pause;
private final int numRetries;
@@ -535,8 +537,8 @@ public class HConnectionManager {
private final Configuration conf;
- // Known region HServerAddress.toString() -> HRegionInterface
+ // Known region ServerName.toString() -> RegionClient/Admin
private final ConcurrentHashMap<String, Map<String, VersionedProtocol>> servers =
new ConcurrentHashMap<String, Map<String, VersionedProtocol>>();
private final ConcurrentHashMap<String, String> connectionLock =
@@ -576,15 +578,15 @@ public class HConnectionManager {
throws ZooKeeperConnectionException {
this.conf = conf;
this.managed = managed;
- String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
- HConstants.DEFAULT_REGION_SERVER_CLASS);
+ String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
+ DEFAULT_ADMIN_PROTOCOL_CLASS);
this.closed = false;
try {
- this.serverInterfaceClass =
- (Class<? extends HRegionInterface>) Class.forName(serverClassName);
+ this.adminClass =
+ (Class<? extends AdminProtocol>) Class.forName(adminClassName);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
- "Unable to find region server interface " + serverClassName, e);
+ "Unable to find region server interface " + adminClassName, e);
}
String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
DEFAULT_CLIENT_PROTOCOL_CLASS);
@@ -730,9 +732,6 @@ public class HConnectionManager {
return getKeepAliveMaster();
} catch (MasterNotRunningException e) {
throw e;
- } catch (IOException e) {
- throw new ZooKeeperConnectionException(
- "Can't create a connection to master", e);
}
}
}
@@ -1057,8 +1056,8 @@ public class HConnectionManager {
metaLocation = locateRegion(parentTable, metaKey);
// If null still, go around again.
if (metaLocation == null) continue;
- HRegionInterface server =
- getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort());
+ ClientProtocol server =
+ getClient(metaLocation.getHostname(), metaLocation.getPort());
Result regionInfoRow = null;
// This block guards against two threads trying to load the meta
@@ -1086,9 +1085,9 @@ public class HConnectionManager {
}
// Query the root or meta region for the location of the meta region
- regionInfoRow = server.getClosestRowBefore(
- metaLocation.getRegionInfo().getRegionName(), metaKey,
- HConstants.CATALOG_FAMILY);
+ regionInfoRow = ProtobufUtil.getRowOrBefore(server,
+ metaLocation.getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
}
if (regionInfoRow == null) {
throw new TableNotFoundException(Bytes.toString(tableName));
@@ -1340,17 +1339,9 @@ public class HConnectionManager {
}
@Override
- @Deprecated
- public HRegionInterface getHRegionConnection(HServerAddress hsa)
- throws IOException {
- return getHRegionConnection(hsa, false);
- }
-
- @Override
- public HRegionInterface getHRegionConnection(final String hostname,
- final int port)
- throws IOException {
- return getHRegionConnection(hostname, port, false);
+ public AdminProtocol getAdmin(final String hostname,
+ final int port) throws IOException {
+ return getAdmin(hostname, port, false);
}
@Override
@@ -1361,21 +1352,10 @@ public class HConnectionManager {
}
@Override
- @Deprecated
- public HRegionInterface getHRegionConnection(HServerAddress hsa,
- boolean master)
- throws IOException {
- String hostname = hsa.getInetSocketAddress().getHostName();
- int port = hsa.getInetSocketAddress().getPort();
- return getHRegionConnection(hostname, port, master);
- }
-
- @Override
- public HRegionInterface getHRegionConnection(final String hostname,
- final int port, final boolean master)
- throws IOException {
- return (HRegionInterface)getProtocol(hostname, port,
- serverInterfaceClass, HRegionInterface.VERSION);
+ public AdminProtocol getAdmin(final String hostname,
+ final int port, final boolean master) throws IOException {
+ return (AdminProtocol)getProtocol(hostname, port,
+ adminClass, AdminProtocol.VERSION);
}
/**
@@ -1591,11 +1571,19 @@ public class HConnectionManager {
}catch (InvocationTargetException e){
// We will have this for all the exception, checked on not, sent
// by any layer, including the functional exception
- if (e.getCause () == null){
+ Throwable cause = e.getCause();
+ if (cause == null){
throw new RuntimeException(
"Proxy invocation failed and getCause is null", e);
}
- throw e.getCause();
+ if (cause instanceof UndeclaredThrowableException) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof ServiceException) {
+ ServiceException se = (ServiceException)cause;
+ cause = ProtobufUtil.getRemoteException(se);
+ }
+ throw cause;
}
}
}
@@ -1715,39 +1703,8 @@ public class HConnectionManager {
ServerCallable<MultiResponse> callable =
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
- try {
- MultiResponse response = new MultiResponse();
- for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
- byte[] regionName = e.getKey();
- int rowMutations = 0;
- List<Action<R>> actions = e.getValue();
- for (Action<R> action: actions) {
- Row row = action.getAction();
- if (row instanceof RowMutations) {
- MultiRequest request =
- RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
- server.multi(null, request);
- response.add(regionName, action.getOriginalIndex(), new Result());
- rowMutations++;
- }
- }
- if (actions.size() > rowMutations) {
- MultiRequest request =
- RequestConverter.buildMultiRequest(regionName, actions);
- ClientProtos.MultiResponse
- proto = server.multi(null, request);
- List<Object> results = ResponseConverter.getResults(proto);
- for (int i = 0, n = results.size(); i < n; i++) {
- int originalIndex = actions.get(i).getOriginalIndex();
- response.add(regionName, originalIndex, results.get(i));
- }
- }
- }
- return response;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
+ return ProtobufUtil.multi(server, multi);
+ }
@Override
public void connect(boolean reload) throws IOException {
server = connection.getClient(
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Apr 23 18:12:16 2012
@@ -662,15 +662,8 @@ public class HTable implements HTableInt
throws IOException {
return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
public Result call() throws IOException {
- try {
- GetRequest request = RequestConverter.buildGetRequest(
- location.getRegionInfo().getRegionName(), row, family, true);
- GetResponse response = server.get(null, request);
- if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ return ProtobufUtil.getRowOrBefore(server,
+ location.getRegionInfo().getRegionName(), row, family);
}
}.withRetries();
}
@@ -715,14 +708,8 @@ public class HTable implements HTableInt
public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
- try {
- GetRequest request = RequestConverter.buildGetRequest(
- location.getRegionInfo().getRegionName(), get);
- GetResponse response = server.get(null, request);
- return ProtobufUtil.toResult(response.getResult());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ return ProtobufUtil.get(server,
+ location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Mon Apr 23 18:12:16 2012
@@ -34,8 +34,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Mon Apr 23 18:12:16 2012
@@ -31,9 +31,6 @@ import org.apache.hadoop.hbase.client.Se
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -80,12 +77,7 @@ public class ExecRPCInvoker implements I
new ServerCallable<ExecResult>(connection, table, row) {
public ExecResult call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
- ExecCoprocessorRequest request =
- RequestConverter.buildExecCoprocessorRequest(regionName, exec);
- ExecCoprocessorResponse response =
- server.execCoprocessor(null, request);
- Object value = ProtobufUtil.toObject(response.getValue());
- return new ExecResult(regionName, value);
+ return ProtobufUtil.execCoprocessor(server, exec, regionName);
}
};
ExecResult result = callable.withRetries();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Mon Apr 23 18:12:16 2012
@@ -32,9 +32,10 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.protobuf.AdminProtocol;
-import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
@@ -50,7 +51,6 @@ public class Invocation extends Versione
private long clientVersion;
private int clientMethodsHash;
-
// For generated protocol classes which don't have VERSION field,
// such as protobuf interfaces.
private static final Map<Class<?>, Long>
@@ -59,6 +59,8 @@ public class Invocation extends Versione
static {
PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
Long.valueOf(ClientProtocol.VERSION));
+ PROTOCOL_VERSION.put(AdminService.BlockingInterface.class,
+ Long.valueOf(AdminProtocol.VERSION));
}
// For protobuf protocols, which use ServiceException, instead of IOException
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java Mon Apr 23 18:12:16 2012
@@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import com.google.protobuf.ServiceException;
+
/** An RPC implementation. */
@InterfaceAudience.Private
interface RpcEngine {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Mon Apr 23 18:12:16 2012
@@ -28,14 +28,18 @@ import java.lang.reflect.UndeclaredThrow
import java.net.InetSocketAddress;
import java.io.*;
+import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -62,6 +66,15 @@ class WritableRpcEngine implements RpcEn
// DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
+ // For protobuf protocols, which use ServiceException, instead of IOException
+ protected static final Set<Class<?>>
+ PROTOBUF_PROTOCOLS = new HashSet<Class<?>>();
+
+ static {
+ PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
+ PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
+ }
+
/* Cache a client using its socket factory as the hash key */
static private class ClientCache {
private Map<SocketFactory, HBaseClient> clients =
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Mon Apr 23 18:12:16 2012
@@ -72,9 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -489,11 +487,7 @@ public class LoadIncrementalHFiles exten
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
- BulkLoadHFileRequest request =
- RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
- BulkLoadHFileResponse response =
- server.bulkLoadHFile(null, request);
- return response.getLoaded();
+ return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName);
}
};
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Apr 23 18:12:16 2012
@@ -2051,40 +2051,36 @@ public class AssignmentManager extends Z
// This never happens. Currently regionserver close always return true.
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
- } catch (NotServingRegionException nsre) {
- LOG.info("Server " + server + " returned " + nsre + " for " +
- region.getRegionNameAsString());
- // Presume that master has stale data. Presume remote side just split.
- // Presume that the split message when it comes in will fix up the master's
- // in memory cluster state.
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
- if (t instanceof NotServingRegionException) {
- if (checkIfRegionBelongsToDisabling(region)) {
- // Remove from the regionsinTransition map
- LOG.info("While trying to recover the table "
- + region.getTableNameAsString()
- + " to DISABLED state the region " + region
- + " was offlined but the table was in DISABLING state");
- synchronized (this.regionsInTransition) {
- this.regionsInTransition.remove(region.getEncodedName());
- }
- // Remove from the regionsMap
- synchronized (this.regions) {
- this.regions.remove(region);
- }
- deleteClosingOrClosedNode(region);
+ }
+ if (t instanceof NotServingRegionException) {
+ // Presume that master has stale data. Presume remote side just split.
+ // Presume that the split message when it comes in will fix up the master's
+ // in memory cluster state.
+ if (checkIfRegionBelongsToDisabling(region)) {
+ // Remove from the regionsinTransition map
+ LOG.info("While trying to recover the table "
+ + region.getTableNameAsString()
+ + " to DISABLED state the region " + region
+ + " was offlined but the table was in DISABLING state");
+ synchronized (this.regionsInTransition) {
+ this.regionsInTransition.remove(region.getEncodedName());
}
+ // Remove from the regionsMap
+ synchronized (this.regions) {
+ this.regions.remove(region);
+ }
+ deleteClosingOrClosedNode(region);
}
+ } else if (t instanceof RegionAlreadyInTransitionException) {
// RS is already processing this region, only need to update the timestamp
- if (t instanceof RegionAlreadyInTransitionException) {
- LOG.debug("update " + state + " the timestamp.");
- state.update(state.getState());
- }
+ LOG.debug("update " + state + " the timestamp.");
+ state.update(state.getState());
}
LOG.info("Server " + server + " returned " + t + " for " +
- region.getEncodedName());
+ region.getRegionNameAsString());
// Presume retry or server will expire.
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Mon Apr 23 18:12:16 2012
@@ -44,13 +44,14 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
/**
@@ -81,8 +82,8 @@ public class ServerManager {
/**
* Map from full server-instance name to the RPC connection for this server.
*/
- private final Map<ServerName, HRegionInterface> serverConnections =
- new HashMap<ServerName, HRegionInterface>();
+ private final Map<ServerName, AdminProtocol> serverConnections =
+ new HashMap<ServerName, AdminProtocol>();
/**
* List of region servers <ServerName> that should not get any more new
@@ -476,14 +477,13 @@ public class ServerManager {
public RegionOpeningState sendRegionOpen(final ServerName server,
HRegionInfo region, int versionOfOfflineNode)
throws IOException {
- HRegionInterface hri = getServerConnection(server);
- if (hri == null) {
+ AdminProtocol admin = getServerConnection(server);
+ if (admin == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
return RegionOpeningState.FAILED_OPENING;
}
- return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri
- .openRegion(region, versionOfOfflineNode);
+ return ProtobufUtil.openRegion(admin, region, versionOfOfflineNode);
}
/**
@@ -496,13 +496,13 @@ public class ServerManager {
*/
public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
throws IOException {
- HRegionInterface hri = getServerConnection(server);
- if (hri == null) {
+ AdminProtocol admin = getServerConnection(server);
+ if (admin == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
return;
}
- hri.openRegions(regions);
+ ProtobufUtil.openRegion(admin, regions);
}
/**
@@ -521,14 +521,15 @@ public class ServerManager {
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode) throws IOException {
if (server == null) throw new NullPointerException("Passed server is null");
- HRegionInterface hri = getServerConnection(server);
- if (hri == null) {
+ AdminProtocol admin = getServerConnection(server);
+ if (admin == null) {
throw new IOException("Attempting to send CLOSE RPC to server " +
server.toString() + " for region " +
region.getRegionNameAsString() +
" failed because no RPC connection found to this server");
}
- return hri.closeRegion(region, versionOfClosingNode);
+ return ProtobufUtil.closeRegion(admin, region.getRegionName(),
+ versionOfClosingNode);
}
/**
@@ -538,15 +539,15 @@ public class ServerManager {
* @throws RetriesExhaustedException wrapping a ConnectException if failed
* putting up proxy.
*/
- private HRegionInterface getServerConnection(final ServerName sn)
+ private AdminProtocol getServerConnection(final ServerName sn)
throws IOException {
- HRegionInterface hri = this.serverConnections.get(sn);
- if (hri == null) {
+ AdminProtocol admin = this.serverConnections.get(sn.toString());
+ if (admin == null) {
LOG.debug("New connection to " + sn.toString());
- hri = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
- this.serverConnections.put(sn, hri);
+ admin = this.connection.getAdmin(sn.getHostname(), sn.getPort());
+ this.serverConnections.put(sn, admin);
}
- return hri;
+ return admin;
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1329358&r1=1329357&r2=1329358&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Mon Apr 23 18:12:16 2012
@@ -39,24 +39,52 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UUID;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
@@ -66,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
@@ -218,6 +248,27 @@ public final class ProtobufUtil {
}
/**
+ * Convert a protocol buffer ServerName to a ServerName
+ *
+ * @param proto the protocol buffer ServerName to convert
+ * @return the converted ServerName
+ */
+ public static ServerName toServerName(
+ final HBaseProtos.ServerName proto) {
+ if (proto == null) return null;
+ String hostName = proto.getHostName();
+ long startCode = -1;
+ int port = -1;
+ if (proto.hasPort()) {
+ port = proto.getPort();
+ }
+ if (proto.hasStartCode()) {
+ startCode = proto.getStartCode();
+ }
+ return new ServerName(hostName, port, startCode);
+ }
+
+ /**
* Convert a RegionInfo to a HRegionInfo
*
* @param proto the RegionInfo to convert
@@ -227,6 +278,11 @@ public final class ProtobufUtil {
toRegionInfo(final RegionInfo proto) {
if (proto == null) return null;
byte[] tableName = proto.getTableName().toByteArray();
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ return HRegionInfo.ROOT_REGIONINFO;
+ } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+ return HRegionInfo.FIRST_META_REGIONINFO;
+ }
long regionId = proto.getRegionId();
byte[] startKey = null;
byte[] endKey = null;
@@ -236,9 +292,16 @@ public final class ProtobufUtil {
if (proto.hasEndKey()) {
endKey = proto.getEndKey().toByteArray();
}
-
- return new HRegionInfo(tableName,
- startKey, endKey, false, regionId);
+ boolean split = false;
+ if (proto.hasSplit()) {
+ split = proto.getSplit();
+ }
+ HRegionInfo hri = new HRegionInfo(tableName,
+ startKey, endKey, split, regionId);
+ if (proto.hasOffline()) {
+ hri.setOffline(proto.getOffline());
+ }
+ return hri;
}
/**
@@ -259,6 +322,8 @@ public final class ProtobufUtil {
if (info.getEndKey() != null) {
builder.setEndKey(ByteString.copyFrom(info.getEndKey()));
}
+ builder.setOffline(info.isOffline());
+ builder.setSplit(info.isSplit());
return builder.build();
}
@@ -596,7 +661,7 @@ public final class ProtobufUtil {
toHLogEntries(final List<WALEntry> protoList) {
List<HLog.Entry> entries = new ArrayList<HLog.Entry>();
for (WALEntry entry: protoList) {
- WALKey walKey = entry.getWalKey();
+ WALKey walKey = entry.getKey();
java.util.UUID clusterId = HConstants.DEFAULT_CLUSTER_ID;
if (walKey.hasClusterId()) {
UUID protoUuid = walKey.getClusterId();
@@ -608,7 +673,7 @@ public final class ProtobufUtil {
walKey.getWriteTime(), clusterId);
WALEntry.WALEdit walEdit = entry.getEdit();
WALEdit edit = new WALEdit();
- for (ByteString keyValue: walEdit.getKeyValueList()) {
+ for (ByteString keyValue: walEdit.getKeyValueBytesList()) {
edit.add(new KeyValue(keyValue.toByteArray()));
}
if (walEdit.getFamilyScopeCount() > 0) {
@@ -721,4 +786,333 @@ public final class ProtobufUtil {
}
return builder.build();
}
+
+// Start helpers for Client
+
+ /**
+ * A helper to invoke a Get using client protocol.
+ *
+ * @param client
+ * @param regionName
+ * @param get
+ * @return the result of the Get
+ * @throws IOException
+ */
+ public static Result get(final ClientProtocol client,
+ final byte[] regionName, final Get get) throws IOException {
+ GetRequest request =
+ RequestConverter.buildGetRequest(regionName, get);
+ try {
+ GetResponse response = client.get(null, request);
+ return toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to get a row of the closet one before using client protocol.
+ *
+ * @param client
+ * @param regionName
+ * @param row
+ * @param family
+ * @return the row or the closestRowBefore if it doesn't exist
+ * @throws IOException
+ */
+ public static Result getRowOrBefore(final ClientProtocol client,
+ final byte[] regionName, final byte[] row,
+ final byte[] family) throws IOException {
+ GetRequest request =
+ RequestConverter.buildGetRowOrBeforeRequest(
+ regionName, row, family);
+ try {
+ GetResponse response = client.get(null, request);
+ if (!response.hasResult()) return null;
+ return toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to invoke a multi action using client protocol.
+ *
+ * @param client
+ * @param multi
+ * @return a multi response
+ * @throws IOException
+ */
+ public static <R> MultiResponse multi(final ClientProtocol client,
+ final MultiAction<R> multi) throws IOException {
+ try {
+ MultiResponse response = new MultiResponse();
+ for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ int rowMutations = 0;
+ List<Action<R>> actions = e.getValue();
+ for (Action<R> action: actions) {
+ Row row = action.getAction();
+ if (row instanceof RowMutations) {
+ MultiRequest request =
+ RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
+ client.multi(null, request);
+ response.add(regionName, action.getOriginalIndex(), new Result());
+ rowMutations++;
+ }
+ }
+ if (actions.size() > rowMutations) {
+ MultiRequest request =
+ RequestConverter.buildMultiRequest(regionName, actions);
+ ClientProtos.MultiResponse
+ proto = client.multi(null, request);
+ List<Object> results = ResponseConverter.getResults(proto);
+ for (int i = 0, n = results.size(); i < n; i++) {
+ int originalIndex = actions.get(i).getOriginalIndex();
+ response.add(regionName, originalIndex, results.get(i));
+ }
+ }
+ }
+ return response;
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to bulk load a list of HFiles using client protocol.
+ *
+ * @param client
+ * @param familyPaths
+ * @param regionName
+ * @return true if all are loaded
+ * @throws IOException
+ */
+ public static boolean bulkLoadHFile(final ClientProtocol client,
+ final List<Pair<byte[], String>> familyPaths,
+ final byte[] regionName) throws IOException {
+ BulkLoadHFileRequest request =
+ RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName);
+ try {
+ BulkLoadHFileResponse response =
+ client.bulkLoadHFile(null, request);
+ return response.getLoaded();
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to exec a coprocessor Exec using client protocol.
+ *
+ * @param client
+ * @param exec
+ * @param regionName
+ * @return the exec result
+ * @throws IOException
+ */
+ public static ExecResult execCoprocessor(final ClientProtocol client,
+ final Exec exec, final byte[] regionName) throws IOException {
+ ExecCoprocessorRequest request =
+ RequestConverter.buildExecCoprocessorRequest(regionName, exec);
+ try {
+ ExecCoprocessorResponse response =
+ client.execCoprocessor(null, request);
+ Object value = ProtobufUtil.toObject(response.getValue());
+ return new ExecResult(regionName, value);
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+// End helpers for Client
+// Start helpers for Admin
+
+ /**
+ * A helper to retrieve region info given a region name
+ * using admin protocol.
+ *
+ * @param admin
+ * @param regionName
+ * @return the retrieved region info
+ * @throws IOException
+ */
+ public static HRegionInfo getRegionInfo(final AdminProtocol admin,
+ final byte[] regionName) throws IOException {
+ try {
+ GetRegionInfoRequest request =
+ RequestConverter.buildGetRegionInfoRequest(regionName);
+ GetRegionInfoResponse response =
+ admin.getRegionInfo(null, request);
+ return toRegionInfo(response.getRegionInfo());
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to close a region given a region name
+ * using admin protocol.
+ *
+ * @param admin
+ * @param regionName
+ * @param transitionInZK
+ * @throws IOException
+ */
+ public static void closeRegion(final AdminProtocol admin,
+ final byte[] regionName, final boolean transitionInZK) throws IOException {
+ CloseRegionRequest closeRegionRequest =
+ RequestConverter.buildCloseRegionRequest(regionName, transitionInZK);
+ try {
+ admin.closeRegion(null, closeRegionRequest);
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to close a region given a region name
+ * using admin protocol.
+ *
+ * @param admin
+ * @param regionName
+ * @param versionOfClosingNode
+ * @return true if the region is closed
+ * @throws IOException
+ */
+ public static boolean closeRegion(final AdminProtocol admin,
+ final byte[] regionName, final int versionOfClosingNode) throws IOException {
+ CloseRegionRequest closeRegionRequest =
+ RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode);
+ try {
+ CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest);
+ return ResponseConverter.isClosed(response);
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to open a region using admin protocol.
+ *
+ * @param admin
+ * @param region
+ * @param versionOfOfflineNode
+ * @return the region opening state
+ * @throws IOException
+ */
+ public static RegionOpeningState openRegion(final AdminProtocol admin,
+ final HRegionInfo region, final int versionOfOfflineNode) throws IOException {
+ OpenRegionRequest request =
+ RequestConverter.buildOpenRegionRequest(region, versionOfOfflineNode);
+ try {
+ OpenRegionResponse response = admin.openRegion(null, request);
+ return ResponseConverter.getRegionOpeningState(response);
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to open a list of regions using admin protocol.
+ *
+ * @param admin
+ * @param regions
+ * @throws IOException
+ */
+ public static void openRegion(final AdminProtocol admin,
+ final List<HRegionInfo> regions) throws IOException {
+ OpenRegionRequest request =
+ RequestConverter.buildOpenRegionRequest(regions);
+ try {
+ admin.openRegion(null, request);
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to get the all the online regions on a region
+ * server using admin protocol.
+ *
+ * @param admin
+ * @return a list of online region info
+ * @throws IOException
+ */
+ public static List<HRegionInfo> getOnlineRegions(
+ final AdminProtocol admin) throws IOException {
+ GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest();
+ List<HRegionInfo> regions = null;
+ try {
+ GetOnlineRegionResponse response =
+ admin.getOnlineRegion(null, request);
+ regions = new ArrayList<HRegionInfo>();
+ for (RegionInfo regionInfo: response.getRegionInfoList()) {
+ regions.add(toRegionInfo(regionInfo));
+ }
+ return regions;
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to get the info of a region server using admin protocol.
+ *
+ * @param admin
+ * @return the server name
+ * @throws IOException
+ */
+ public static ServerName getServerInfo(
+ final AdminProtocol admin) throws IOException {
+ GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest();
+ try {
+ GetServerInfoResponse response = admin.getServerInfo(null, request);
+ return toServerName(response.getServerName());
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to replicate a list of HLog entries using admin protocol.
+ *
+ * @param admin
+ * @param entries
+ * @throws IOException
+ */
+ public static void replicateWALEntry(final AdminProtocol admin,
+ final HLog.Entry[] entries) throws IOException {
+ ReplicateWALEntryRequest request =
+ RequestConverter.buildReplicateWALEntryRequest(entries);
+ try {
+ admin.replicateWALEntry(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ /**
+ * A helper to get the list of files of a column family
+ * on a given region using admin protocol.
+ *
+ * @param admin
+ * @param regionName
+ * @param family
+ * @return the list of store files
+ * @throws IOException
+ */
+ public static List<String> getStoreFiles(final AdminProtocol admin,
+ final byte[] regionName, final byte[] family) throws IOException {
+ GetStoreFileRequest request =
+ RequestConverter.buildGetStoreFileRequest(regionName, family);
+ try {
+ GetStoreFileResponse response = admin.getStoreFile(null, request);
+ return response.getStoreFileList();
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+// End helpers for Admin
}
\ No newline at end of file