You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/07 15:00:22 UTC
svn commit: r1575261 - in /hbase/branches/hbase-10070:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-server/src/main/java/org/apache/hadoop/hbase/client/
hbase-server/src/main/...
Author: nkeywal
Date: Fri Mar 7 14:00:21 2014
New Revision: 1575261
URL: http://svn.apache.org/r1575261
Log:
HBASE-10355 Failover RPC's from client using region replicas
Added:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
Modified:
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Fri Mar 7 14:00:21 2014
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -49,12 +50,9 @@ import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.ipc.RemoteException;
import org.cloudera.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
/**
* This class allows a continuous flow of requests. It's written to be compatible with a
@@ -95,7 +93,7 @@ class AsyncProcess {
protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final AtomicLong COUNTER = new AtomicLong();
- public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
+ public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
/**
* The context used to wait for results from one submit call.
@@ -180,7 +178,7 @@ class AsyncProcess {
protected int numTries;
protected int serverTrackerTimeout;
protected long clientOpTimeout;
- protected long primaryCallTimeout;
+ protected long primaryCallTimeoutMicroseconds;
// End configuration settings.
protected static class BatchErrors {
@@ -239,7 +237,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.clientOpTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
+ this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@@ -564,9 +562,9 @@ class AsyncProcess {
@Override
public void run() {
boolean done = false;
- if (primaryCallTimeout > 0) {
+ if (primaryCallTimeoutMicroseconds > 0) {
try {
- done = waitUntilDone(startTime + primaryCallTimeout);
+ done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
} catch (InterruptedException ex) {
LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
return;
@@ -875,7 +873,7 @@ class AsyncProcess {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
actionsForReplicaThread, startTime);
- if (primaryCallTimeout == 0) {
+ if (primaryCallTimeoutMicroseconds == 0) {
// Start replica calls immediately.
replicaRunnable.run();
} else {
@@ -1287,22 +1285,25 @@ class AsyncProcess {
private boolean waitUntilDone(long cutoff) throws InterruptedException {
boolean hasWait = cutoff != Long.MAX_VALUE;
- long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
+ long lastLog = EnvironmentEdgeManager.currentTimeMillis();
long currentInProgress;
while (0 != (currentInProgress = actionsInProgress.get())) {
- long now = 0;
- if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (hasWait && (now * 1000L) > cutoff) {
return false;
}
- if (!hasWait) {
- // Only log if wait is infinite.
- now = EnvironmentEdgeManager.currentTimeMillis();
+ if (!hasWait) { // Only log if wait is infinite.
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
}
- synchronized (actionsInProgress) {
- actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
+ }
+ synchronized (actionsInProgress) {
+ if (!hasWait) {
+ actionsInProgress.wait(100);
+ } else {
+ long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
+ TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
}
}
}
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java Fri Mar 7 14:00:21 2014
@@ -143,7 +143,17 @@ interface ClusterConnection extends HCon
final boolean useCache,
final boolean offlined) throws IOException;
-
+ /**
+ *
+ * @param tableName table to get regions of
+ * @param row the row
+ * @param useCache Should we use the cache to retrieve the region information.
+ * @param retry do we retry
+ * @return region locations for this row.
+ * @throws IOException
+ */
+ RegionLocations locateRegion(TableName tableName,
+ byte[] row, boolean useCache, boolean retry) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Fri Mar 7 14:00:21 2014
@@ -1021,7 +1021,8 @@ class ConnectionManager {
}
- private RegionLocations locateRegion(final TableName tableName,
+ @Override
+ public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry)
throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Mar 7 14:00:21 2014
@@ -132,8 +132,12 @@ public class HTable implements HTableInt
private ExecutorService pool; // For Multi
private boolean closed;
private int operationTimeout;
+ private int retries;
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
+ private Consistency defaultConsistency = Consistency.STRONG;
+ private int primaryCallTimeoutMicroSecond;
+
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap;
@@ -355,6 +359,10 @@ public class HTable implements HTableInt
this.scannerCaching = this.configuration.getInt(
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ this.primaryCallTimeoutMicroSecond =
+ this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
+ this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
// puts need to track errors globally due to how the APIs currently work.
@@ -789,16 +797,28 @@ public class HTable implements HTableInt
*/
@Override
public Result get(final Get get) throws IOException {
- RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), get.getRow()) {
- @Override
- public Result call() throws IOException {
- return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
- }
- };
- return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
+ if (get.getConsistency() == null){
+ get.setConsistency(defaultConsistency);
+ }
+
+ if (get.getConsistency() == Consistency.STRONG) {
+ // Good old call.
+ RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
+ getName(), get.getRow()) {
+ public Result call() throws IOException {
+ return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
+ }
+ };
+ return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
+ }
+
+ // Call that takes into account the replica
+ RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
+ tableName, this.connection, get, pool, retries, operationTimeout, primaryCallTimeoutMicroSecond);
+ return callable.call();
}
+
/**
* {@inheritDoc}
*/
Added: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (added)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java Fri Mar 7 14:00:21 2014
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.BoundedCompletionService;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Caller that goes to replica if the primary region does no answer within a configurable
+ * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
+ * the first answer. If the answer comes from one of the secondary replica, it will
+ * be marked as stale.
+ */
+public class RpcRetryingCallerWithReadReplicas {
+ static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
+ protected final ExecutorService pool;
+ protected final ClusterConnection cConnection;
+ protected final Configuration conf;
+ protected final Get get;
+ protected final TableName tableName;
+ protected final int timeBeforeReplicas;
+ private final int callTimeout;
+ private final int retries;
+
+ public RpcRetryingCallerWithReadReplicas(TableName tableName,
+ ClusterConnection cConnection, final Get get,
+ ExecutorService pool, int retries, int callTimeout,
+ int timeBeforeReplicas) {
+ this.tableName = tableName;
+ this.cConnection = cConnection;
+ this.conf = cConnection.getConfiguration();
+ this.get = get;
+ this.pool = pool;
+ this.retries = retries;
+ this.callTimeout = callTimeout;
+ this.timeBeforeReplicas = timeBeforeReplicas;
+ }
+
+ /**
+ * A RegionServerCallable that takes into account the replicas, i.e.
+ * - the call can be on any replica
+ * - we need to stop retrying when the call is completed
+ * - we can be interrupted
+ */
+ class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
+ final int id;
+
+ public ReplicaRegionServerCallable(int id, HRegionLocation location) {
+ super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+ RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+ this.id = id;
+ this.location = location;
+ }
+
+ /**
+ * Two responsibilities
+ * - if the call is already completed (by another replica) stops the retries.
+ * - set the location to the right region, depending on the replica.
+ */
+ @Override
+ public void prepare(final boolean reload) throws IOException {
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+
+ if (reload || location == null) {
+ RegionLocations rl = getRegionLocations(false);
+ location = id < rl.size() ? rl.getRegionLocation(id) : null;
+ }
+
+ if (location == null) {
+ // With this exception, there will be a retry. The location can be null for a replica
+ // when the table is created or after a split.
+ throw new HBaseIOException("There is no location for replica id #" + id);
+ }
+
+ ServerName dest = location.getServerName();
+ assert dest != null;
+
+ setStub(cConnection.getClient(dest));
+ }
+
+ @Override
+ public Result call() throws Exception {
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+
+ byte[] reg = location.getRegionInfo().getRegionName();
+
+ return ProtobufUtil.get(getStub(), reg, get);
+ // Ideally, we would build the object only once, but we can't right now as
+ // the protobufUtils wants the destination server.
+ }
+ }
+
+ /**
+ * Adapter to put the HBase retrying caller into a Java callable.
+ */
+ class RetryingRPC implements Callable<Result> {
+ final RetryingCallable<Result> callable;
+
+ RetryingRPC(RetryingCallable<Result> callable) {
+ this.callable = callable;
+ }
+
+ @Override
+ public Result call() throws IOException {
+ return new RpcRetryingCallerFactory(conf).<Result>newCaller().
+ callWithRetries(callable, callTimeout);
+ }
+ }
+
+ /**
+ * Algo:
+ * - we put the query into the execution pool.
+ * - after x ms, if we don't have a result, we add the queries for the secondary replicas
+ * - we take the first answer
+ * - when done, we cancel what's left. Cancelling means:
+ * - removing from the pool if the actual call was not started
+ * - interrupting the call if it has started
+ * Client side, we need to take into account
+ * - a call is not executed immediately after being put into the pool
+ * - a call is a thread. Let's not multiply the number of thread by the number of replicas.
+ * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
+ * can take some i/o.
+ * <p/>
+ * Globally, the number of retries, timeout and so on still applies, but it's per replica,
+ * not global. We continue until all retries are done, or all timeouts are exceeded.
+ */
+ public synchronized Result call()
+ throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
+ RegionLocations rl = getRegionLocations(true);
+ BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
+
+ addCallsForReplica(cs, rl, 0, 0); // primary.
+
+ try {
+ Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
+ if (f == null) {
+ addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries
+ f = cs.take();
+ }
+ return f.get();
+ } catch (ExecutionException e) {
+ throwEnrichedException(e);
+ return null; // unreachable
+ } catch (CancellationException e) {
+ throw new InterruptedIOException();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ } finally {
+ // We get there because we were interrupted or because one or more of the
+ // calls succeeded or failed. In all case, we stop all our tasks.
+ cs.cancelAll(true);
+ }
+ }
+
+ /**
+ * Extract the real exception from the ExecutionException, and throws what makes more
+ * sense.
+ */
+ private void throwEnrichedException(ExecutionException e)
+ throws RetriesExhaustedException, DoNotRetryIOException {
+ Throwable t = e.getCause();
+ assert t != null; // That's what ExecutionException is about: holding an exception
+
+ if (t instanceof RetriesExhaustedException) {
+ throw (RetriesExhaustedException) t;
+ }
+
+ if (t instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException) t;
+ }
+
+ RetriesExhaustedException.ThrowableWithExtraContext qt =
+ new RetriesExhaustedException.ThrowableWithExtraContext(t,
+ EnvironmentEdgeManager.currentTimeMillis(), toString());
+
+ List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
+ Collections.singletonList(qt);
+
+ throw new RetriesExhaustedException(retries, exceptions);
+ }
+
+ /**
+ * Creates the calls and submit them
+ *
+ * @param cs - the completion service to use for submitting
+ * @param rl - the region locations
+ * @param min - the id of the first replica, inclusive
+ * @param max - the id of the last replica, inclusive.
+ */
+ private void addCallsForReplica(BoundedCompletionService<Result> cs,
+ RegionLocations rl, int min, int max) {
+ for (int id = min; id <= max; id++) {
+ HRegionLocation hrl = rl.getRegionLocation(id);
+ ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
+ RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
+ cs.submit(retryingOnReplica);
+ }
+ }
+
+ private RegionLocations getRegionLocations(boolean useCache)
+ throws RetriesExhaustedException, DoNotRetryIOException {
+ RegionLocations rl;
+ try {
+ rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true);
+ } catch (IOException e) {
+ if (e instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException) e;
+ } else if (e instanceof RetriesExhaustedException) {
+ throw (RetriesExhaustedException) e;
+ } else {
+ throw new RetriesExhaustedException("Can't get the location", e);
+ }
+ }
+ if (rl == null) {
+ throw new RetriesExhaustedException("Can't get the locations");
+ }
+
+ return rl;
+ }
+}
\ No newline at end of file
Added: hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (added)
+++ hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java Fri Mar 7 14:00:21 2014
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A completion service, close to the one available in the JDK 1.7
+ * However, this ones keeps the list of the future, and allows to cancel them all.
+ * This means as well that it can be used for a small set of tasks only.
+ */
+public class BoundedCompletionService<V> {
+ private final Executor executor;
+ private final List<Future<V>> sent; // alls the call we sent
+ private final BlockingQueue<Future<V>> completed; // all the results we got so far.
+
+ class QueueingFuture extends FutureTask<V> {
+
+ public QueueingFuture(Callable<V> callable) {
+ super(callable);
+ }
+
+ protected void done() {
+ completed.add(QueueingFuture.this);
+ }
+ }
+
+ public BoundedCompletionService(Executor executor, int maxTasks) {
+ this.executor = executor;
+ this.sent = new ArrayList<Future<V>>(maxTasks);
+ this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
+ }
+
+
+ public Future<V> submit(Callable<V> task) {
+ QueueingFuture newFuture = new QueueingFuture(task);
+ executor.execute(newFuture);
+ sent.add(newFuture);
+ return newFuture;
+ }
+
+ public Future<V> take() throws InterruptedException{
+ return completed.take();
+ }
+
+ public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{
+ return completed.poll(timeout, unit);
+ }
+
+ public void cancelAll(boolean interrupt) {
+ for (Future<V> future : sent) {
+ future.cancel(interrupt);
+ }
+ }
+}
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java Fri Mar 7 14:00:21 2014
@@ -325,6 +325,12 @@ class CoprocessorHConnection implements
}
@Override
+ public RegionLocations locateRegion(TableName tableName, byte[] row,
+ boolean useCache, boolean retry) throws IOException {
+ return delegate.locateRegion(tableName, row, useCache, retry);
+ }
+
+ @Override
public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
throws IOException {
return delegate.locateRegions(tableName, useCache, offlined);
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Mar 7 14:00:21 2014
@@ -211,6 +211,7 @@ import org.apache.hadoop.hbase.util.FSUt
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.Threads;
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java Fri Mar 7 14:00:21 2014
@@ -47,7 +47,7 @@ public class StorefileRefresherChore ext
/**
* The period (in milliseconds) for refreshing the store files for the secondary regions.
*/
- static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
+ public static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
= "hbase.regionserver.storefile.refresh.period";
static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default
Added: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java (added)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java Fri Mar 7 14:00:21 2014
@@ -0,0 +1,466 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
+ * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
+ */
+@Category(MediumTests.class)
+public class TestReplicasClient {
+ private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
+
+ private static final int NB_SERVERS = 1;
+ private static HTable table = null;
+ private static final byte[] row = TestReplicasClient.class.getName().getBytes();
+
+ private static HRegionInfo hriPrimary;
+ private static HRegionInfo hriSecondary;
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+ private final static int REFRESH_PERIOD = 1000;
+
+ /**
+ * This copro is used to synchronize the tests.
+ */
+ public static class SlowMeCopro extends BaseRegionObserver {
+ static final AtomicLong sleepTime = new AtomicLong(0);
+ static final AtomicReference<CountDownLatch> cdl =
+ new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+
+ public SlowMeCopro() {
+ }
+
+ @Override
+ public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<Cell> results) throws IOException {
+
+ if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
+ CountDownLatch latch = cdl.get();
+ try {
+ if (sleepTime.get() > 0) {
+ LOG.info("Sleeping for " + sleepTime.get() + " ms");
+ Thread.sleep(sleepTime.get());
+ } else if (latch.getCount() > 0) {
+ LOG.info("Waiting for the counterCountDownLatch");
+ latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+ if (latch.getCount() > 0) {
+ throw new RuntimeException("Can't wait more");
+ }
+ }
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
+ } else {
+ LOG.info("We're not the primary replicas.");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // enable store file refreshing
+ HTU.getConfiguration().setInt(
+ StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
+
+ HTU.startMiniCluster(NB_SERVERS);
+
+ // Create table then get the single region for our new table.
+ HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
+ hdt.addCoprocessor(SlowMeCopro.class.getName());
+ table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
+
+ hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
+
+ // mock a secondary region info to open
+ hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
+ hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
+
+ // No master
+ LOG.info("Master is going to be stopped");
+ HTU.getHBaseCluster().getMaster().stopMaster();
+ Configuration c = new Configuration(HTU.getConfiguration());
+ c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ HBaseAdmin ha = new HBaseAdmin(c);
+ for (boolean masterRuns = true; masterRuns; ) {
+ Thread.sleep(100);
+ try {
+ masterRuns = false;
+ masterRuns = ha.isMasterRunning();
+ } catch (MasterNotRunningException ignored) {
+ }
+ }
+ LOG.info("Master has stopped");
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ if (table != null) table.close();
+ HTU.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws IOException {
+ HTU.getHBaseAdmin().getConnection().clearRegionCache();
+ }
+
+ @After
+ public void after() throws IOException, KeeperException {
+ try {
+ closeRegion(hriSecondary);
+ } catch (Exception ignored) {
+ }
+ ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
+ ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
+
+ HTU.getHBaseAdmin().getConnection().clearRegionCache();
+ }
+
+ private HRegionServer getRS() {
+ return HTU.getMiniHBaseCluster().getRegionServer(0);
+ }
+
+ private void openRegion(HRegionInfo hri) throws Exception {
+ ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+ // first version is '0'
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(hri, 0, null);
+ AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
+ Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
+ Assert.assertEquals(responseOpen.getOpeningState(0),
+ AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
+ checkRegionIsOpened(hri);
+ }
+
+ private void closeRegion(HRegionInfo hri) throws Exception {
+ ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+
+ AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
+ hri.getEncodedName(), true);
+ AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
+ Assert.assertTrue(responseClose.getClosed());
+
+ checkRegionIsClosed(hri.getEncodedName());
+
+ ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
+ }
+
+ private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
+
+ while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ Thread.sleep(1);
+ }
+
+ Assert.assertTrue(
+ ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
+ }
+
+ private void checkRegionIsClosed(String encodedRegionName) throws Exception {
+
+ while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ Thread.sleep(1);
+ }
+
+ try {
+ Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
+ } catch (NotServingRegionException expected) {
+ // That's how it work: if the region is closed we have an exception.
+ }
+
+ // We don't delete the znode here, because there is not always a znode.
+ }
+
+ @Test
+ public void testUseRegionWithoutReplica() throws Exception {
+ byte[] b1 = "testUseRegionWithoutReplica".getBytes();
+ openRegion(hriSecondary);
+ SlowMeCopro.cdl.set(new CountDownLatch(0));
+ try {
+ Get g = new Get(b1);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ } finally {
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testLocations() throws Exception {
+ byte[] b1 = "testLocations".getBytes();
+ openRegion(hriSecondary);
+ ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
+
+ try {
+ hc.clearRegionCache();
+ RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
+ Assert.assertEquals(2, rl.size());
+
+ rl = hc.locateRegion(table.getName(), b1, true, false);
+ Assert.assertEquals(2, rl.size());
+
+ hc.clearRegionCache();
+ rl = hc.locateRegion(table.getName(), b1, true, false);
+ Assert.assertEquals(2, rl.size());
+
+ rl = hc.locateRegion(table.getName(), b1, false, false);
+ Assert.assertEquals(2, rl.size());
+ } finally {
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
+ byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
+ openRegion(hriSecondary);
+
+ try {
+ // A get works and is not stale
+ Get g = new Get(b1);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ } finally {
+ closeRegion(hriSecondary);
+ }
+ }
+
+
+ @Test
+ public void testGetNoResultStaleRegionWithReplica() throws Exception {
+ byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
+ openRegion(hriSecondary);
+
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ try {
+ Get g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ Result r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
+ byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
+ openRegion(hriSecondary);
+
+ try {
+ // We sleep; but we won't go to the stale region as we don't get the stale by default.
+ SlowMeCopro.sleepTime.set(2000);
+ Get g = new Get(b1);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+
+ } finally {
+ SlowMeCopro.sleepTime.set(0);
+ closeRegion(hriSecondary);
+ }
+ }
+
+
+ @Test
+ public void testFlushTable() throws Exception {
+ openRegion(hriSecondary);
+ try {
+ HTU.getHBaseAdmin().flush(table.getTableName());
+
+ Put p = new Put(row);
+ p.add(f, row, row);
+ table.put(p);
+
+ HTU.getHBaseAdmin().flush(table.getTableName());
+ } finally {
+ Delete d = new Delete(row);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testFlushPrimary() throws Exception {
+ openRegion(hriSecondary);
+
+ try {
+ HTU.getHBaseAdmin().flush(hriPrimary.getEncodedNameAsBytes());
+
+ Put p = new Put(row);
+ p.add(f, row, row);
+ table.put(p);
+
+ HTU.getHBaseAdmin().flush(hriPrimary.getEncodedNameAsBytes());
+ } finally {
+ Delete d = new Delete(row);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testFlushSecondary() throws Exception {
+ openRegion(hriSecondary);
+ try {
+ HTU.getHBaseAdmin().flush(hriSecondary.getEncodedNameAsBytes());
+
+ Put p = new Put(row);
+ p.add(f, row, row);
+ table.put(p);
+
+ HTU.getHBaseAdmin().flush(hriSecondary.getEncodedNameAsBytes());
+ } catch (TableNotFoundException expected) {
+ } finally {
+ Delete d = new Delete(row);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testUseRegionWithReplica() throws Exception {
+ byte[] b1 = "testUseRegionWithReplica".getBytes();
+ openRegion(hriSecondary);
+
+ try {
+ // A simple put works, even if there here a second replica
+ Put p = new Put(b1);
+ p.add(f, b1, b1);
+ table.put(p);
+ LOG.info("Put done");
+
+ // A get works and is not stale
+ Get g = new Get(b1);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+ LOG.info("get works and is not stale done");
+
+ // Even if it we have to wait a little on the main region
+ SlowMeCopro.sleepTime.set(2000);
+ g = new Get(b1);
+ r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+ SlowMeCopro.sleepTime.set(0);
+ LOG.info("sleep and is not stale done");
+
+ // But if we ask for stale we will get it
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
+ SlowMeCopro.cdl.get().countDown();
+
+ LOG.info("stale done");
+
+ // exists works and is not stale
+ g = new Get(b1);
+ g.setCheckExistenceOnly(true);
+ r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ Assert.assertTrue(r.getExists());
+ LOG.info("exists not stale done");
+
+ // exists works on stale but don't see the put
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setCheckExistenceOnly(true);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ Assert.assertFalse("The secondary has stale data", r.getExists());
+ SlowMeCopro.cdl.get().countDown();
+ LOG.info("exists stale before flush done");
+
+ HTU.getHBaseAdmin().flush(table.getTableName());
+ LOG.info("flush done");
+ Thread.sleep(1000 + REFRESH_PERIOD * 2);
+
+ // get works and is not stale
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ Assert.assertFalse(r.isEmpty());
+ SlowMeCopro.cdl.get().countDown();
+ LOG.info("stale done");
+
+ // exists works on stale and we see the put after the flush
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setCheckExistenceOnly(true);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ Assert.assertTrue(r.getExists());
+ SlowMeCopro.cdl.get().countDown();
+ LOG.info("exists stale after flush done");
+
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ Delete d = new Delete(b1);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+}
\ No newline at end of file