You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/05 06:22:54 UTC
svn commit: r1334314 [3/4] - in /hbase/trunk: conf/
security/src/main/java/org/apache/hadoop/hbase/security/
src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/ ...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1334314&r1=1334313&r2=1334314&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat May 5 04:22:52 2012
@@ -43,9 +43,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -68,21 +70,19 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
@@ -91,10 +91,8 @@ import org.apache.hadoop.hbase.client.Ge
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -102,24 +100,78 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
@@ -128,7 +180,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.User;
@@ -136,6 +187,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
@@ -157,14 +209,11 @@ import org.apache.hadoop.util.StringUtil
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@@ -172,19 +221,97 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import com.google.common.base.Function;
-import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
@InterfaceAudience.Private
-public class HRegionServer extends RegionServer
- implements HRegionInterface, HBaseRPCErrorHandler {
+@SuppressWarnings("deprecation")
+public class HRegionServer implements ClientProtocol,
+ AdminProtocol, Runnable, RegionServerServices, HBaseRPCErrorHandler {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
+ private final Random rand = new Random();
+
+ /*
+ * Strings to be used in forming the exception message for
+ * RegionsAlreadyInTransitionException.
+ */
+ protected static final String OPEN = "OPEN";
+ protected static final String CLOSE = "CLOSE";
+
+ //RegionName vs current action in progress
+ //true - if open region action in progress
+ //false - if close region action in progress
+ protected final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
+ new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
+
+ protected long maxScannerResultSize;
+
+ // Cache flushing
+ protected MemStoreFlusher cacheFlusher;
+
+ // catalog tracker
+ protected CatalogTracker catalogTracker;
+
+ /**
+ * Go here to get table descriptors.
+ */
+ protected TableDescriptors tableDescriptors;
+
+ // Replication services. If no replication, this handler will be null.
+ protected ReplicationSourceService replicationSourceHandler;
+ protected ReplicationSinkService replicationSinkHandler;
+
+ // Compactions
+ public CompactSplitThread compactSplitThread;
+
+ final Map<String, RegionScanner> scanners =
+ new ConcurrentHashMap<String, RegionScanner>();
+
+ /**
+ * Map of regions currently being served by this region server. Key is the
+ * encoded region name. All access should be synchronized.
+ */
+ protected final Map<String, HRegion> onlineRegions =
+ new ConcurrentHashMap<String, HRegion>();
+
+ // Leases
+ protected Leases leases;
+
+ // Instance of the hbase executor service.
+ protected ExecutorService service;
+
+ // Request counter.
+ // Do we need this? Can't we just sum region counters? St.Ack 20110412
+ protected AtomicInteger requestCount = new AtomicInteger();
+
+ // If false, the file system has become unavailable
+ protected volatile boolean fsOk;
+ protected HFileSystem fs;
+
+ protected static final int NORMAL_QOS = 0;
+ protected static final int QOS_THRESHOLD = 10; // the line between low and high qos
+ protected static final int HIGH_QOS = 100;
+
+ // Set when a report to the master comes back with a message asking us to
+ // shutdown. Also set by call to stop when debugging or running unit tests
+ // of HRegionServer in isolation.
+ protected volatile boolean stopped = false;
+
+ // Go down hard. Used if file system becomes unavailable and also in
+ // debugging and unit tests.
+ protected volatile boolean abortRequested;
+
+ // Port we put up the webui on.
+ protected int webuiport = -1;
+
+ Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
+
// A state before we go into stopped state. At this stage we're closing user
// space regions.
private boolean stopping = false;
@@ -231,7 +358,6 @@ public class HRegionServer extends Regio
private RegionServerMetrics metrics;
- @SuppressWarnings("unused")
private RegionServerDynamicMetrics dynamicMetrics;
/*
@@ -280,9 +406,6 @@ public class HRegionServer extends Regio
*/
private ServerName serverNameFromMasterPOV;
- // Port we put up the webui on.
- private int webuiport = -1;
-
/**
* This servers startcode.
*/
@@ -349,7 +472,7 @@ public class HRegionServer extends Regio
}
this.rpcServer = HBaseRPC.getServer(this,
- new Class<?>[]{HRegionInterface.class, ClientProtocol.class,
+ new Class<?>[]{ClientProtocol.class,
AdminProtocol.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
@@ -582,7 +705,7 @@ public class HRegionServer extends Regio
// Create the thread for the ThriftServer.
if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
- thriftServer = new HRegionThriftServer((RegionServer)this, conf);
+ thriftServer = new HRegionThriftServer(this, conf);
thriftServer.start();
LOG.info("Started Thrift API from Region Server.");
}
@@ -591,7 +714,6 @@ public class HRegionServer extends Regio
/**
* The HRegionServer sticks in this loop until closed.
*/
- @SuppressWarnings("deprecation")
public void run() {
try {
// Do pre-registration initializations; zookeeper, lease threads, etc.
@@ -1834,1581 +1956,1841 @@ public class HRegionServer extends Regio
}
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public HRegionInfo getRegionInfo(final byte[] regionName)
- throws NotServingRegionException, IOException {
- checkOpen();
- requestCount.incrementAndGet();
- return getRegion(regionName).getRegionInfo();
- }
-
- public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
- final byte[] family) throws IOException {
- checkOpen();
- requestCount.incrementAndGet();
- try {
- // locate the region we're operating on
- HRegion region = getRegion(regionName);
- // ask the region for all the data
-
- Result r = region.getClosestRowBefore(row, family);
- return r;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- }
+ /** @return the info server */
+ public InfoServer getInfoServer() {
+ return infoServer;
}
- /** {@inheritDoc} */
- public Result get(byte[] regionName, Get get) throws IOException {
- checkOpen();
- final long startTime = System.nanoTime();
- requestCount.incrementAndGet();
- try {
- HRegion region = getRegion(regionName);
- return region.get(get, getLockFromId(get.getLockId()));
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- } finally {
- this.metrics.getLatencies.update(System.nanoTime() - startTime);
- }
+ /**
+ * @return true if a stop has been requested.
+ */
+ public boolean isStopped() {
+ return this.stopped;
}
- public boolean exists(byte[] regionName, Get get) throws IOException {
- checkOpen();
- requestCount.incrementAndGet();
- try {
- HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(get.getLockId());
- if (region.getCoprocessorHost() != null) {
- Boolean result = region.getCoprocessorHost().preExists(get);
- if (result != null) {
- return result.booleanValue();
- }
- }
- Result r = region.get(get, lock);
- boolean result = r != null && !r.isEmpty();
- if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postExists(get, result);
- }
- return result;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- }
+ @Override
+ public boolean isStopping() {
+ return this.stopping;
}
- public void put(final byte[] regionName, final Put put) throws IOException {
- if (put.getRow() == null) {
- throw new IllegalArgumentException("update has null row");
- }
-
- final long startTime = System.nanoTime();
- checkOpen();
- this.requestCount.incrementAndGet();
- HRegion region = getRegion(regionName);
- try {
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
- boolean writeToWAL = put.getWriteToWAL();
- region.put(put, getLockFromId(put.getLockId()), writeToWAL);
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- } finally {
- this.metrics.putLatencies.update(System.nanoTime() - startTime);
- }
+ /**
+ *
+ * @return the configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
}
- public int put(final byte[] regionName, final List<Put> puts)
- throws IOException {
- checkOpen();
- HRegion region = null;
- int i = 0;
-
- final long startTime = System.nanoTime();
- try {
- region = getRegion(regionName);
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
-
- @SuppressWarnings("unchecked")
- Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
-
- for (Put p : puts) {
- Integer lock = getLockFromId(p.getLockId());
- putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
- }
-
- this.requestCount.addAndGet(puts.size());
- OperationStatus codes[] = region.put(putsWithLocks);
- for (i = 0; i < codes.length; i++) {
- if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
- return i;
- }
- }
- return -1;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- } finally {
- // going to count this as puts.size() PUTs for latency calculations
- final long totalTime = System.nanoTime() - startTime;
- final long putCount = i;
- final long perPutTime = totalTime / putCount;
- for (int request = 0; request < putCount; request++) {
- this.metrics.putLatencies.update(perPutTime);
- }
- }
+ /** @return the write lock for the server */
+ ReentrantReadWriteLock.WriteLock getWriteLock() {
+ return lock.writeLock();
}
- private boolean checkAndMutate(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final CompareOp compareOp,
- final WritableByteArrayComparable comparator, final Writable w,
- Integer lock) throws IOException {
- checkOpen();
- this.requestCount.incrementAndGet();
- HRegion region = getRegion(regionName);
- try {
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
- return region.checkAndMutate(row, family, qualifier, compareOp,
- comparator, w, lock, true);
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- }
+ public int getNumberOfOnlineRegions() {
+ return this.onlineRegions.size();
}
- /**
- *
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param value
- * the expected value
- * @param put
- * @throws IOException
- * @return true if the new put was execute, false otherwise
- */
- public boolean checkAndPut(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final byte[] value,
- final Put put) throws IOException {
- checkOpen();
- if (regionName == null) {
- throw new IOException("Invalid arguments to checkAndPut "
- + "regionName is null");
- }
- HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(put.getLockId());
- WritableByteArrayComparable comparator = new BinaryComparator(value);
- if (region.getCoprocessorHost() != null) {
- Boolean result = region.getCoprocessorHost()
- .preCheckAndPut(row, family, qualifier, CompareOp.EQUAL, comparator,
- put);
- if (result != null) {
- return result.booleanValue();
- }
- }
- boolean result = checkAndMutate(regionName, row, family, qualifier,
- CompareOp.EQUAL, new BinaryComparator(value), put,
- lock);
- if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndPut(row, family,
- qualifier, CompareOp.EQUAL, comparator, put, result);
- }
- return result;
+ boolean isOnlineRegionsEmpty() {
+ return this.onlineRegions.isEmpty();
}
/**
- *
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param compareOp
- * @param comparator
- * @param put
+ * @param encodedRegionName
+ * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
* @throws IOException
- * @return true if the new put was execute, false otherwise
*/
- public boolean checkAndPut(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final CompareOp compareOp,
- final WritableByteArrayComparable comparator, final Put put)
- throws IOException {
- checkOpen();
- if (regionName == null) {
- throw new IOException("Invalid arguments to checkAndPut "
- + "regionName is null");
+ public byte [] getRegionStats(final String encodedRegionName)
+ throws IOException {
+ HRegion r = null;
+ synchronized (this.onlineRegions) {
+ r = this.onlineRegions.get(encodedRegionName);
}
- HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(put.getLockId());
- if (region.getCoprocessorHost() != null) {
- Boolean result = region.getCoprocessorHost()
- .preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
- if (result != null) {
- return result.booleanValue();
+ if (r == null) return null;
+ ObjectMapper mapper = new ObjectMapper();
+ int stores = 0;
+ int storefiles = 0;
+ int storefileSizeMB = 0;
+ int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
+ int storefileIndexSizeMB = 0;
+ synchronized (r.stores) {
+ stores += r.stores.size();
+ for (Store store : r.stores.values()) {
+ storefiles += store.getStorefilesCount();
+ storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
+ storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
}
}
- boolean result = checkAndMutate(regionName, row, family, qualifier,
- compareOp, comparator, put, lock);
- if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndPut(row, family,
- qualifier, compareOp, comparator, put, result);
- }
- return result;
+ Map<String, Integer> map = new TreeMap<String, Integer>();
+ map.put("stores", stores);
+ map.put("storefiles", storefiles);
+ map.put("storefileSizeMB", storefileSizeMB);
+ map.put("storefileIndexSizeMB", storefileIndexSizeMB);
+ map.put("memstoreSizeMB", memstoreSizeMB);
+ StringWriter w = new StringWriter();
+ mapper.writeValue(w, map);
+ w.close();
+ return Bytes.toBytes(w.toString());
}
/**
- *
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param value
- * the expected value
- * @param delete
- * @throws IOException
- * @return true if the new put was execute, false otherwise
+ * For tests and web ui.
+ * This method will only work if HRegionServer is in the same JVM as client;
+ * HRegion cannot be serialized to cross an rpc.
+ * @see #getOnlineRegions()
*/
- public boolean checkAndDelete(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final byte[] value,
- final Delete delete) throws IOException {
- checkOpen();
-
- if (regionName == null) {
- throw new IOException("Invalid arguments to checkAndDelete "
- + "regionName is null");
- }
- HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(delete.getLockId());
- WritableByteArrayComparable comparator = new BinaryComparator(value);
- if (region.getCoprocessorHost() != null) {
- Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
- family, qualifier, CompareOp.EQUAL, comparator, delete);
- if (result != null) {
- return result.booleanValue();
- }
- }
- boolean result = checkAndMutate(regionName, row, family, qualifier,
- CompareOp.EQUAL, comparator, delete, lock);
- if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndDelete(row, family,
- qualifier, CompareOp.EQUAL, comparator, delete, result);
- }
- return result;
+ public Collection<HRegion> getOnlineRegionsLocalContext() {
+ Collection<HRegion> regions = this.onlineRegions.values();
+ return Collections.unmodifiableCollection(regions);
}
@Override
- public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
- throws IllegalArgumentException {
- return getStoreFileList(regionName, new byte[][]{columnFamily});
+ public void addToOnlineRegions(HRegion region) {
+ this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
}
@Override
- public List<String> getStoreFileList(byte[] regionName, byte[][] columnFamilies)
- throws IllegalArgumentException {
- HRegion region = getOnlineRegion(regionName);
- if (region == null) {
- throw new IllegalArgumentException("No region: " + new String(regionName)
- + " available");
- }
- return region.getStoreFileList(columnFamilies);
+ public boolean removeFromOnlineRegions(final String encodedName) {
+ HRegion toReturn = null;
+ toReturn = this.onlineRegions.remove(encodedName);
+
+ //Clear all of the dynamic metrics as they are now probably useless.
+ //This is a clear because dynamic metrics could include metrics per cf and
+ //per hfile. Figuring out which cfs, hfiles, and regions are still relevant to
+ //this region server would be an onerous task. Instead just clear everything
+ //and on the next tick of the metrics everything that is still relevant will be
+ //re-added.
+ this.dynamicMetrics.clear();
+ return toReturn != null;
}
- public List<String> getStoreFileList(byte[] regionName)
- throws IllegalArgumentException {
- HRegion region = getOnlineRegion(regionName);
- if (region == null) {
- throw new IllegalArgumentException("No region: " + new String(regionName)
- + " available");
- }
- Set<byte[]> columnFamilies = region.getStores().keySet();
- int nCF = columnFamilies.size();
- return region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
- }
-
- /**
- * Flushes the given region
- */
- public void flushRegion(byte[] regionName)
- throws IllegalArgumentException, IOException {
- HRegion region = getOnlineRegion(regionName);
- if (region == null) {
- throw new IllegalArgumentException("No region : " + new String(regionName)
- + " available");
+ /**
+ * @return A new Map of online regions sorted by region size with the first
+ * entry being the biggest.
+ */
+ public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
+ // we'll sort the regions in reverse
+ SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
+ new Comparator<Long>() {
+ public int compare(Long a, Long b) {
+ return -1 * a.compareTo(b);
+ }
+ });
+ // Copy over all regions. Regions are sorted by size with biggest first.
+ for (HRegion region : this.onlineRegions.values()) {
+ sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
}
- region.flushcache();
+ return sortedRegions;
}
- /**
- * Flushes the given region if lastFlushTime < ifOlderThanTS
- */
- public void flushRegion(byte[] regionName, long ifOlderThanTS)
- throws IllegalArgumentException, IOException {
- HRegion region = getOnlineRegion(regionName);
- if (region == null) {
- throw new IllegalArgumentException("No region : " + new String(regionName)
- + " available");
- }
- if (region.getLastFlushTime() < ifOlderThanTS) region.flushcache();
- }
+ /** @return the request count */
+ public AtomicInteger getRequestCount() {
+ return this.requestCount;
+ }
/**
- * Gets last flush time for the given region
- * @return the last flush time for a region
+ * @return time stamp in millis of when this region server was started
*/
- public long getLastFlushTime(byte[] regionName) {
- HRegion region = getOnlineRegion(regionName);
- if (region == null) {
- throw new IllegalArgumentException("No region : " + new String(regionName)
- + " available");
- }
- return region.getLastFlushTime();
+ public long getStartcode() {
+ return this.startcode;
}
-
- /**
- *
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param compareOp
- * @param comparator
- * @param delete
- * @throws IOException
- * @return true if the new put was execute, false otherwise
- */
- public boolean checkAndDelete(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final CompareOp compareOp,
- final WritableByteArrayComparable comparator, final Delete delete)
- throws IOException {
- checkOpen();
-
- if (regionName == null) {
- throw new IOException("Invalid arguments to checkAndDelete "
- + "regionName is null");
- }
- HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(delete.getLockId());
- if (region.getCoprocessorHost() != null) {
- Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
- family, qualifier, compareOp, comparator, delete);
- if (result != null) {
- return result.booleanValue();
- }
- }
- boolean result = checkAndMutate(regionName, row, family, qualifier,
- compareOp, comparator, delete, lock);
- if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndDelete(row, family,
- qualifier, compareOp, comparator, delete, result);
- }
- return result;
- }
- //
- // remote scanner interface
- //
+ /** @return reference to FlushRequester */
+ public FlushRequester getFlushRequester() {
+ return this.cacheFlusher;
+ }
- public long openScanner(byte[] regionName, Scan scan) throws IOException {
- checkOpen();
- NullPointerException npe = null;
- if (regionName == null) {
- npe = new NullPointerException("regionName is null");
- } else if (scan == null) {
- npe = new NullPointerException("scan is null");
- }
- if (npe != null) {
- throw new IOException("Invalid arguments to openScanner", npe);
- }
- requestCount.incrementAndGet();
- try {
- HRegion r = getRegion(regionName);
- r.checkRow(scan.getStartRow(), "Scan");
- r.prepareScanner(scan);
- RegionScanner s = null;
- if (r.getCoprocessorHost() != null) {
- s = r.getCoprocessorHost().preScannerOpen(scan);
- }
- if (s == null) {
- s = r.getScanner(scan);
- }
- if (r.getCoprocessorHost() != null) {
- RegionScanner savedScanner = r.getCoprocessorHost().postScannerOpen(
- scan, s);
- if (savedScanner == null) {
- LOG.warn("PostScannerOpen impl returning null. "
- + "Check the RegionObserver implementation.");
- } else {
- s = savedScanner;
- }
+ /**
+ * Get the top N most loaded regions this server is serving so we can tell the
+ * master which regions it can reallocate if we're overloaded. TODO: actually
+ * calculate which regions are most loaded. (Right now, we're just grabbing
+ * the first N regions being served regardless of load.)
+ */
+ protected HRegionInfo[] getMostLoadedRegions() {
+ ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+ for (HRegion r : onlineRegions.values()) {
+ if (!r.isAvailable()) {
+ continue;
+ }
+ if (regions.size() < numRegionsToReport) {
+ regions.add(r.getRegionInfo());
+ } else {
+ break;
}
- return addScanner(s);
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t, "Failed openScanner"));
}
+ return regions.toArray(new HRegionInfo[regions.size()]);
}
- public Result next(final long scannerId) throws IOException {
- Result[] res = next(scannerId, 1);
- if (res == null || res.length == 0) {
- return null;
+ @Override
+ @QosPriority(priority=HIGH_QOS)
+ public ProtocolSignature getProtocolSignature(
+ String protocol, long version, int clientMethodsHashCode)
+ throws IOException {
+ if (protocol.equals(ClientProtocol.class.getName())) {
+ return new ProtocolSignature(ClientProtocol.VERSION, null);
+ } else if (protocol.equals(AdminProtocol.class.getName())) {
+ return new ProtocolSignature(AdminProtocol.VERSION, null);
}
- return res[0];
+ throw new IOException("Unknown protocol: " + protocol);
}
- public Result[] next(final long scannerId, int nbRows) throws IOException {
- String scannerName = String.valueOf(scannerId);
- RegionScanner s = this.scanners.get(scannerName);
- if (s == null) throw new UnknownScannerException("Name: " + scannerName);
- try {
- checkOpen();
- } catch (IOException e) {
- // If checkOpen failed, server not running or filesystem gone,
- // cancel this lease; filesystem is gone or we're closing or something.
- try {
- this.leases.cancelLease(scannerName);
- } catch (LeaseException le) {
- LOG.info("Server shutting down and client tried to access missing scanner " +
- scannerName);
- }
- throw e;
+ @Override
+ @QosPriority(priority=HIGH_QOS)
+ public long getProtocolVersion(final String protocol, final long clientVersion)
+ throws IOException {
+ if (protocol.equals(ClientProtocol.class.getName())) {
+ return ClientProtocol.VERSION;
+ } else if (protocol.equals(AdminProtocol.class.getName())) {
+ return AdminProtocol.VERSION;
}
- Leases.Lease lease = null;
- try {
- // Remove lease while its being processed in server; protects against case
- // where processing of request takes > lease expiration time.
- lease = this.leases.removeLease(scannerName);
- List<Result> results = new ArrayList<Result>(nbRows);
- long currentScanResultSize = 0;
- List<KeyValue> values = new ArrayList<KeyValue>();
-
- // Call coprocessor. Get region info from scanner.
- HRegion region = getRegion(s.getRegionInfo().getRegionName());
- if (region != null && region.getCoprocessorHost() != null) {
- Boolean bypass = region.getCoprocessorHost().preScannerNext(s,
- results, nbRows);
- if (!results.isEmpty()) {
- for (Result r : results) {
- for (KeyValue kv : r.raw()) {
- currentScanResultSize += kv.heapSize();
- }
- }
- }
- if (bypass != null) {
- return s.isFilterDone() && results.isEmpty() ? null
- : results.toArray(new Result[0]);
- }
- }
- long maxResultSize;
- if (s.getMaxResultSize() > 0) {
- maxResultSize = s.getMaxResultSize();
- } else {
- maxResultSize = maxScannerResultSize;
- }
- for (int i = 0; i < nbRows && currentScanResultSize < maxResultSize; i++) {
- requestCount.incrementAndGet();
- // Collect values to be returned here
- boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
- if (!values.isEmpty()) {
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
- }
- results.add(new Result(values));
- }
- if (!moreRows) {
- break;
- }
- values.clear();
- }
+ throw new IOException("Unknown protocol: " + protocol);
+ }
- // coprocessor postNext hook
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
- }
+ /**
+ * @return Return the leases.
+ */
+ protected Leases getLeases() {
+ return leases;
+ }
- // If the scanner's filter - if any - is done with the scan
- // and wants to tell the client to stop the scan. This is done by passing
- // a null result.
- return s.isFilterDone() && results.isEmpty() ? null
- : results.toArray(new Result[0]);
- } catch (Throwable t) {
- if (t instanceof NotServingRegionException) {
- this.scanners.remove(scannerName);
- }
- throw convertThrowableToIOE(cleanup(t));
- } finally {
- // We're done. On way out readd the above removed lease. Adding resets
- // expiration time on lease.
- if (this.scanners.containsKey(scannerName)) {
- if (lease != null) this.leases.addLease(lease);
- }
- }
+ /**
+ * @return Return the rootDir.
+ */
+ protected Path getRootDir() {
+ return rootDir;
}
- public void close(final long scannerId) throws IOException {
- try {
- checkOpen();
- requestCount.incrementAndGet();
- String scannerName = String.valueOf(scannerId);
- RegionScanner s = scanners.get(scannerName);
+ /**
+ * @return Return the fs.
+ */
+ public FileSystem getFileSystem() {
+ return fs;
+ }
- HRegion region = null;
- if (s != null) {
- // call coprocessor.
- region = getRegion(s.getRegionInfo().getRegionName());
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(s)) {
- return; // bypass
- }
- }
- }
+ public String toString() {
+ return getServerName().toString();
+ }
- s = scanners.remove(scannerName);
- if (s != null) {
- s.close();
- this.leases.cancelLease(scannerName);
+ /**
+ * Interval at which threads should run
+ *
+ * @return the interval
+ */
+ public int getThreadWakeFrequency() {
+ return threadWakeFrequency;
+ }
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(s);
- }
- }
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- }
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return zooKeeper;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ // Our servername could change after we talk to the master.
+ return this.serverNameFromMasterPOV == null?
+ new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
+ this.serverNameFromMasterPOV;
+ }
+
+ @Override
+ public CompactionRequestor getCompactionRequester() {
+ return this.compactSplitThread;
+ }
+
+ public ZooKeeperWatcher getZooKeeperWatcher() {
+ return this.zooKeeper;
+ }
+
+
+ public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
+ return this.regionsInTransitionInRS;
+ }
+
+ public ExecutorService getExecutorService() {
+ return service;
}
//
- // Methods that do the actual work for the remote API
+ // Main program and support routines
//
- public void delete(final byte[] regionName, final Delete delete)
- throws IOException {
- checkOpen();
- final long startTime = System.nanoTime();
- try {
- boolean writeToWAL = delete.getWriteToWAL();
- this.requestCount.incrementAndGet();
- HRegion region = getRegion(regionName);
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
- Integer lid = getLockFromId(delete.getLockId());
- region.delete(delete, lid, writeToWAL);
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- } finally {
- this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
+
+ /**
+ * Load the replication service objects, if any
+ */
+ static private void createNewReplicationInstance(Configuration conf,
+ HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
+
+ // If replication is not enabled, then return immediately.
+ if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+ return;
}
- }
- public int delete(final byte[] regionName, final List<Delete> deletes)
- throws IOException {
- checkOpen();
- // Count of Deletes processed.
- int i = 0;
- HRegion region = null;
- try {
- region = getRegion(regionName);
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
- int size = deletes.size();
- Integer[] locks = new Integer[size];
- for (Delete delete : deletes) {
- final long startTime = System.nanoTime();
- this.requestCount.incrementAndGet();
- locks[i] = getLockFromId(delete.getLockId());
- region.delete(delete, locks[i], delete.getWriteToWAL());
- i++;
- this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
- }
- } catch (WrongRegionException ex) {
- LOG.debug("Batch deletes: " + i, ex);
- return i;
- } catch (NotServingRegionException ex) {
- return i;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
+ // read in the name of the source replication class from the config file.
+ String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
+ HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+ // read in the name of the sink replication class from the config file.
+ String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+ HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+ // If both the sink and the source class names are the same, then instantiate
+ // only one object.
+ if (sourceClassname.equals(sinkClassname)) {
+ server.replicationSourceHandler = (ReplicationSourceService)
+ newReplicationInstance(sourceClassname,
+ conf, server, fs, logDir, oldLogDir);
+ server.replicationSinkHandler = (ReplicationSinkService)
+ server.replicationSourceHandler;
+ }
+ else {
+ server.replicationSourceHandler = (ReplicationSourceService)
+ newReplicationInstance(sourceClassname,
+ conf, server, fs, logDir, oldLogDir);
+ server.replicationSinkHandler = (ReplicationSinkService)
+ newReplicationInstance(sinkClassname,
+ conf, server, fs, logDir, oldLogDir);
}
- return -1;
}
- public long lockRow(byte[] regionName, byte[] row) throws IOException {
- checkOpen();
- NullPointerException npe = null;
- if (regionName == null) {
- npe = new NullPointerException("regionName is null");
- } else if (row == null) {
- npe = new NullPointerException("row to lock is null");
- }
- if (npe != null) {
- IOException io = new IOException("Invalid arguments to lockRow");
- io.initCause(npe);
- throw io;
- }
- requestCount.incrementAndGet();
+ static private ReplicationService newReplicationInstance(String classname,
+ Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
+ Path oldLogDir) throws IOException{
+
+ Class<?> clazz = null;
try {
- HRegion region = getRegion(regionName);
- Integer r = region.obtainRowLock(row);
- long lockId = addRowLock(r, region);
- LOG.debug("Row lock " + lockId + " explicitly acquired by client");
- return lockId;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
- + this.fsOk + ")"));
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ clazz = Class.forName(classname, true, classLoader);
+ } catch (java.lang.ClassNotFoundException nfe) {
+ throw new IOException("Cound not find class for " + classname);
}
+
+ // create an instance of the replication object.
+ ReplicationService service = (ReplicationService)
+ ReflectionUtils.newInstance(clazz, conf);
+ service.initialize(server, fs, logDir, oldLogDir);
+ return service;
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public void unlockRow(byte[] regionName, long lockId) throws IOException {
- checkOpen();
- NullPointerException npe = null;
- if (regionName == null) {
- npe = new NullPointerException("regionName is null");
- } else if (lockId == -1L) {
- npe = new NullPointerException("lockId is null");
- }
- if (npe != null) {
- IOException io = new IOException("Invalid arguments to unlockRow");
- io.initCause(npe);
- throw io;
- }
- requestCount.incrementAndGet();
+ /**
+ * @param hrs
+ * @return Thread the RegionServer is running in correctly named.
+ * @throws IOException
+ */
+ public static Thread startRegionServer(final HRegionServer hrs)
+ throws IOException {
+ return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
+ }
+
+ /**
+ * @param hrs
+ * @param name
+ * @return Thread the RegionServer is running in correctly named.
+ * @throws IOException
+ */
+ public static Thread startRegionServer(final HRegionServer hrs,
+ final String name) throws IOException {
+ Thread t = new Thread(hrs);
+ t.setName(name);
+ t.start();
+ // Install shutdown hook that will catch signals and run an orderly shutdown
+ // of the hrs.
+ ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
+ .getConfiguration()), hrs, t);
+ return t;
+ }
+
+ /**
+ * Utility for constructing an instance of the passed HRegionServer class.
+ *
+ * @param regionServerClass
+ * @param conf2
+ * @return HRegionServer instance.
+ */
+ public static HRegionServer constructRegionServer(
+ Class<? extends HRegionServer> regionServerClass,
+ final Configuration conf2) {
try {
- HRegion region = getRegion(regionName);
- String lockName = String.valueOf(lockId);
- Integer r = rowlocks.remove(lockName);
- if (r == null) {
- throw new UnknownRowLockException(lockName);
- }
- region.releaseRowLock(r);
- this.leases.cancelLease(lockName);
- LOG.debug("Row lock " + lockId
- + " has been explicitly released by client");
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
+ Constructor<? extends HRegionServer> c = regionServerClass
+ .getConstructor(Configuration.class);
+ return c.newInstance(conf2);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed construction of " + "Regionserver: "
+ + regionServerClass.toString(), e);
}
}
/**
- * Atomically bulk load several HFiles into an open region
- * @return true if successful, false is failed but recoverably (no action)
- * @throws IOException if failed unrecoverably
+ * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
*/
- @Override
- public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
- byte[] regionName) throws IOException {
- checkOpen();
- HRegion region = getRegion(regionName);
- return region.bulkLoadHFiles(familyPaths);
+ public static void main(String[] args) throws Exception {
+ VersionInfo.logVersion();
+ Configuration conf = HBaseConfiguration.create();
+ @SuppressWarnings("unchecked")
+ Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+ .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+
+ new HRegionServerCommandLine(regionServerClass).doMain(args);
}
- // Region open/close direct RPCs
+ /**
+ * Gets the online regions of the specified table.
+ * This method looks at the in-memory onlineRegions. It does not go to <code>.META.</code>.
+ * Only returns <em>online</em> regions. If a region on this table has been
+ * closed during a disable, etc., it will not be included in the returned list.
+ * So, the returned list may not necessarily be ALL regions in this table, its
+ * all the ONLINE regions in the table.
+ * @param tableName
+ * @return Online regions from <code>tableName</code>
+ */
+ public List<HRegion> getOnlineRegions(byte[] tableName) {
+ List<HRegion> tableRegions = new ArrayList<HRegion>();
+ synchronized (this.onlineRegions) {
+ for (HRegion region: this.onlineRegions.values()) {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ if(Bytes.equals(regionInfo.getTableName(), tableName)) {
+ tableRegions.add(region);
+ }
+ }
+ }
+ return tableRegions;
+ }
- @Override
- @QosPriority(priority=HIGH_QOS)
- public RegionOpeningState openRegion(HRegionInfo region)
- throws IOException {
- return openRegion(region, -1);
- }
- @Override
- @QosPriority(priority = HIGH_QOS)
- public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
- throws IOException {
- checkOpen();
- checkIfRegionInTransition(region, OPEN);
- HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName());
- if (null != onlineRegion) {
- // See HBASE-5094. Cross check with META if still this RS is owning the
- // region.
- Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
- this.catalogTracker, region.getRegionName());
- if (this.getServerName().equals(p.getSecond())) {
- LOG.warn("Attempted open of " + region.getEncodedName()
- + " but already online on this server");
- return RegionOpeningState.ALREADY_OPENED;
- } else {
- LOG.warn("The region " + region.getEncodedName()
- + " is online on this server but META does not have this server.");
- this.removeFromOnlineRegions(region.getEncodedName());
- }
- }
- LOG.info("Received request to open region: " +
- region.getRegionNameAsString());
- HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
- this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(),
- true);
- // Need to pass the expected version in the constructor.
- if (region.isRootRegion()) {
- this.service.submit(new OpenRootHandler(this, this, region, htd,
- versionOfOfflineNode));
- } else if (region.isMetaRegion()) {
- this.service.submit(new OpenMetaHandler(this, this, region, htd,
- versionOfOfflineNode));
- } else {
- this.service.submit(new OpenRegionHandler(this, this, region, htd,
- versionOfOfflineNode));
- }
- return RegionOpeningState.OPENED;
+ // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
+ public String[] getCoprocessors() {
+ HBaseProtos.ServerLoad sl = buildServerLoad();
+ return sl == null? null:
+ ServerLoad.getRegionServerCoprocessors(new ServerLoad(sl));
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public void openRegions(List<HRegionInfo> regions)
- throws IOException {
- checkOpen();
- LOG.info("Received request to open " + regions.size() + " region(s)");
- for (HRegionInfo region: regions) openRegion(region);
+ /**
+ * Register bean with platform management server
+ */
+ void registerMBean() {
+ MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
+ mxBean = MBeanUtil.registerMBean("RegionServer", "RegionServer",
+ mxBeanInfo);
+ LOG.info("Registered RegionServer MXBean");
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public boolean closeRegion(HRegionInfo region)
- throws IOException {
- return closeRegion(region, true, -1);
+ /**
+ * Instantiated as a row lock lease. If the lease times out, the row lock is
+ * released
+ */
+ private class RowLockListener implements LeaseListener {
+ private final String lockName;
+ private final HRegion region;
+
+ RowLockListener(final String lockName, final HRegion region) {
+ this.lockName = lockName;
+ this.region = region;
+ }
+
+ public void leaseExpired() {
+ LOG.info("Row Lock " + this.lockName + " lease expired");
+ Integer r = rowlocks.remove(this.lockName);
+ if (r != null) {
+ region.releaseRowLock(r);
+ }
+ }
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public boolean closeRegion(final HRegionInfo region,
- final int versionOfClosingNode)
- throws IOException {
- return closeRegion(region, true, versionOfClosingNode);
+ /**
+ * Instantiated as a scanner lease. If the lease times out, the scanner is
+ * closed
+ */
+ private class ScannerListener implements LeaseListener {
+ private final String scannerName;
+
+ ScannerListener(final String n) {
+ this.scannerName = n;
+ }
+
+ public void leaseExpired() {
+ RegionScanner s = scanners.remove(this.scannerName);
+ if (s != null) {
+ LOG.info("Scanner " + this.scannerName + " lease expired on region "
+ + s.getRegionInfo().getRegionNameAsString());
+ try {
+ HRegion region = getRegion(s.getRegionInfo().getRegionName());
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preScannerClose(s);
+ }
+
+ s.close();
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(s);
+ }
+ } catch (IOException e) {
+ LOG.error("Closing scanner for "
+ + s.getRegionInfo().getRegionNameAsString(), e);
+ }
+ } else {
+ LOG.info("Scanner " + this.scannerName + " lease expired");
+ }
+ }
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public boolean closeRegion(HRegionInfo region, final boolean zk)
- throws IOException {
- return closeRegion(region, zk, -1);
+ /**
+ * Method to get the Integer lock identifier used internally from the long
+ * lock identifier used by the client.
+ *
+ * @param lockId
+ * long row lock identifier from client
+ * @return intId Integer row lock used internally in HRegion
+ * @throws IOException
+ * Thrown if this is not a valid client lock id.
+ */
+ Integer getLockFromId(long lockId) throws IOException {
+ if (lockId == -1L) {
+ return null;
+ }
+ String lockName = String.valueOf(lockId);
+ Integer rl = rowlocks.get(lockName);
+ if (rl == null) {
+ throw new UnknownRowLockException("Invalid row lock");
+ }
+ this.leases.renewLease(lockName);
+ return rl;
}
- @QosPriority(priority=HIGH_QOS)
- protected boolean closeRegion(HRegionInfo region, final boolean zk,
- final int versionOfClosingNode)
- throws IOException {
- checkOpen();
- LOG.info("Received close region: " + region.getRegionNameAsString() +
- ". Version of ZK closing node:" + versionOfClosingNode);
- boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
- if (!hasit) {
- LOG.warn("Received close for region we are not serving; " +
- region.getEncodedName());
- throw new NotServingRegionException("Received close for "
- + region.getRegionNameAsString() + " but we are not serving it");
+ /**
+ * Called to verify that this server is up and running.
+ *
+ * @throws IOException
+ */
+ protected void checkOpen() throws IOException {
+ if (this.stopped || this.abortRequested) {
+ throw new RegionServerStoppedException("Server " + getServerName() +
+ " not running" + (this.abortRequested ? ", aborting" : ""));
+ }
+ if (!fsOk) {
+ throw new RegionServerStoppedException("File system not available");
}
- checkIfRegionInTransition(region, CLOSE);
- return closeRegion(region, false, zk, versionOfClosingNode);
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public boolean closeRegion(byte[] encodedRegionName, boolean zk)
- throws IOException {
- return closeRegion(encodedRegionName, false, zk);
+ protected void checkIfRegionInTransition(HRegionInfo region,
+ String currentAction) throws RegionAlreadyInTransitionException {
+ byte[] encodedName = region.getEncodedNameAsBytes();
+ if (this.regionsInTransitionInRS.containsKey(encodedName)) {
+ boolean openAction = this.regionsInTransitionInRS.get(encodedName);
+ // The below exception message will be used in master.
+ throw new RegionAlreadyInTransitionException("Received:" + currentAction +
+ " for the region:" + region.getRegionNameAsString() +
+ " ,which we are already trying to " +
+ (openAction ? OPEN : CLOSE)+ ".");
+ }
}
/**
- * @param encodedRegionName
- * encodedregionName to close
- * @param abort
- * True if we are aborting
- * @param zk
- * True if we are to update zk about the region close; if the close
- * was orchestrated by master, then update zk. If the close is being
- * run by the regionserver because its going down, don't update zk.
+ * @param region Region to close
+ * @param abort True if we are aborting
+ * @param zk True if we are to update zk about the region close; if the close
+ * was orchestrated by master, then update zk. If the close is being run by
+ * the regionserver because its going down, don't update zk.
* @return True if closed a region.
*/
- protected boolean closeRegion(byte[] encodedRegionName, final boolean abort,
- final boolean zk) throws IOException {
- String encodedRegionNameStr = Bytes.toString(encodedRegionName);
- HRegion region = this.getFromOnlineRegions(encodedRegionNameStr);
- if (null != region) {
- return closeRegion(region.getRegionInfo(), abort, zk);
- }
- LOG.error("The specified region name" + encodedRegionNameStr
- + " does not exist to close the region.");
- return false;
+ protected boolean closeRegion(HRegionInfo region, final boolean abort,
+ final boolean zk) {
+ return closeRegion(region, abort, zk, -1);
}
- // Manual remote region administration RPCs
-
- @Override
- @QosPriority(priority=HIGH_QOS)
- public void flushRegion(HRegionInfo regionInfo)
- throws NotServingRegionException, IOException {
- checkOpen();
- LOG.info("Flushing " + regionInfo.getRegionNameAsString());
- HRegion region = getRegion(regionInfo.getRegionName());
- region.flushcache();
+ /**
+ * @param region Region to close
+ * @param abort True if we are aborting
+ * @param zk True if we are to update zk about the region close; if the close
+ * was orchestrated by master, then update zk. If the close is being run by
+ * the regionserver because its going down, don't update zk.
+ * @param versionOfClosingNode
+ * the version of znode to compare when RS transitions the znode from
+ * CLOSING state.
+ * @return True if closed a region.
+ */
+ protected boolean closeRegion(HRegionInfo region, final boolean abort,
+ final boolean zk, final int versionOfClosingNode) {
+ if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
+ LOG.warn("Received close for region we are already opening or closing; " +
+ region.getEncodedName());
+ return false;
+ }
+ this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
+ CloseRegionHandler crh = null;
+ if (region.isRootRegion()) {
+ crh = new CloseRootHandler(this, this, region, abort, zk,
+ versionOfClosingNode);
+ } else if (region.isMetaRegion()) {
+ crh = new CloseMetaHandler(this, this, region, abort, zk,
+ versionOfClosingNode);
+ } else {
+ crh = new CloseRegionHandler(this, this, region, abort, zk,
+ versionOfClosingNode);
+ }
+ this.service.submit(crh);
+ return true;
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public void splitRegion(HRegionInfo regionInfo)
- throws NotServingRegionException, IOException {
- splitRegion(regionInfo, null);
+ /**
+ * @param regionName
+ * @return HRegion for the passed binary <code>regionName</code> or null if
+ * named region is not member of the online regions.
+ */
+ public HRegion getOnlineRegion(final byte[] regionName) {
+ String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+ return this.onlineRegions.get(encodedRegionName);
}
@Override
- public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
- throws NotServingRegionException, IOException {
- checkOpen();
- HRegion region = getRegion(regionInfo.getRegionName());
- region.flushcache();
- region.forceSplit(splitPoint);
- compactSplitThread.requestSplit(region, region.checkSplit());
+ public HRegion getFromOnlineRegions(final String encodedRegionName) {
+ return this.onlineRegions.get(encodedRegionName);
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public void compactRegion(HRegionInfo regionInfo, boolean major)
- throws NotServingRegionException, IOException {
- checkOpen();
- HRegion region = getRegion(regionInfo.getRegionName());
- if (major) {
- region.triggerMajorCompaction();
- }
- compactSplitThread.requestCompaction(region, "User-triggered "
- + (major ? "major " : "") + "compaction",
- CompactSplitThread.PRIORITY_USER);
+ /**
+ * Protected utility method for safely obtaining an HRegion handle.
+ *
+ * @param regionName
+ * Name of online {@link HRegion} to return
+ * @return {@link HRegion} for <code>regionName</code>
+ * @throws NotServingRegionException
+ */
+ protected HRegion getRegion(final byte[] regionName)
+ throws NotServingRegionException {
+ HRegion region = null;
+ region = getOnlineRegion(regionName);
+ if (region == null) {
+ throw new NotServingRegionException("Region is not online: " +
+ Bytes.toStringBinary(regionName));
+ }
+ return region;
}
- /** @return the info server */
- public InfoServer getInfoServer() {
- return infoServer;
+ /*
+ * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+ * IOE if it isn't already.
+ *
+ * @param t Throwable
+ *
+ * @return Throwable converted to an IOE; methods can only let out IOEs.
+ */
+ protected Throwable cleanup(final Throwable t) {
+ return cleanup(t, null);
}
- /**
- * @return true if a stop has been requested.
+ /*
+ * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+ * IOE if it isn't already.
+ *
+ * @param t Throwable
+ *
+ * @param msg Message to log in error. Can be null.
+ *
+ * @return Throwable converted to an IOE; methods can only let out IOEs.
*/
- public boolean isStopped() {
- return this.stopped;
+ protected Throwable cleanup(final Throwable t, final String msg) {
+ // Don't log as error if NSRE; NSRE is 'normal' operation.
+ if (t instanceof NotServingRegionException) {
+ LOG.debug("NotServingRegionException; " + t.getMessage());
+ return t;
+ }
+ if (msg == null) {
+ LOG.error("", RemoteExceptionHandler.checkThrowable(t));
+ } else {
+ LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
+ }
+ if (!checkOOME(t)) {
+ checkFileSystem();
+ }
+ return t;
}
- @Override
- public boolean isStopping() {
- return this.stopping;
+ /*
+ * @param t
+ *
+ * @return Make <code>t</code> an IOE if it isn't already.
+ */
+ protected IOException convertThrowableToIOE(final Throwable t) {
+ return convertThrowableToIOE(t, null);
}
- /**
+ /*
+ * @param t
*
- * @return the configuration
+ * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
+ *
+ * @return Make <code>t</code> an IOE if it isn't already.
*/
- public Configuration getConfiguration() {
- return conf;
+ protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
+ return (t instanceof IOException ? (IOException) t : msg == null
+ || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
}
- /** @return the write lock for the server */
- ReentrantReadWriteLock.WriteLock getWriteLock() {
- return lock.writeLock();
+ /*
+ * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+ *
+ * @param e
+ *
+ * @return True if we OOME'd and are aborting.
+ */
+ public boolean checkOOME(final Throwable e) {
+ boolean stop = false;
+ try {
+ if (e instanceof OutOfMemoryError
+ || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+ || (e.getMessage() != null && e.getMessage().contains(
+ "java.lang.OutOfMemoryError"))) {
+ stop = true;
+ LOG.fatal(
+ "Run out of memory; HRegionServer will abort itself immediately", e);
+ }
+ } finally {
+ if (stop) {
+ Runtime.getRuntime().halt(1);
+ }
+ }
+ return stop;
}
- @Override
- @QosPriority(priority=HIGH_QOS)
- public List<HRegionInfo> getOnlineRegions() throws IOException {
- checkOpen();
- List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
- for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
- list.add(e.getValue().getRegionInfo());
+ /**
+ * Checks to see if the file system is still accessible. If not, sets
+ * abortRequested and stopRequested
+ *
+ * @return false if file system is not available
+ */
+ public boolean checkFileSystem() {
+ if (this.fsOk && this.fs != null) {
+ try {
+ FSUtils.checkFileSystemAvailable(this.fs);
+ } catch (IOException e) {
+ abort("File System not available", e);
+ this.fsOk = false;
+ }
}
- Collections.sort(list);
- return list;
+ return this.fsOk;
}
- public int getNumberOfOnlineRegions() {
- return this.onlineRegions.size();
+ protected long addRowLock(Integer r, HRegion region)
+ throws LeaseStillHeldException {
+ long lockId = nextLong();
+ String lockName = String.valueOf(lockId);
+ rowlocks.put(lockName, r);
+ this.leases.createLease(lockName, new RowLockListener(lockName, region));
+ return lockId;
}
- boolean isOnlineRegionsEmpty() {
- return this.onlineRegions.isEmpty();
+ protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+ long scannerId = nextLong();
+ String scannerName = String.valueOf(scannerId);
+ scanners.put(scannerName, s);
+ this.leases.createLease(scannerName, new ScannerListener(scannerName));
+ return scannerId;
}
/**
- * @param encodedRegionName
- * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
- * @throws IOException
+ * Generate a random positive long number
+ *
+ * @return a random positive long number
*/
- public byte [] getRegionStats(final String encodedRegionName)
- throws IOException {
- HRegion r = null;
- synchronized (this.onlineRegions) {
- r = this.onlineRegions.get(encodedRegionName);
+ protected long nextLong() {
+ long n = rand.nextLong();
+ if (n == 0) {
+ return nextLong();
}
- if (r == null) return null;
- ObjectMapper mapper = new ObjectMapper();
- int stores = 0;
- int storefiles = 0;
- int storefileSizeMB = 0;
- int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
- int storefileIndexSizeMB = 0;
- long totalCompactingKVs = 0;
- long currentCompactedKVs = 0;
- synchronized (r.stores) {
- stores += r.stores.size();
- for (Store store : r.stores.values()) {
- storefiles += store.getStorefilesCount();
- storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
- storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
- }
+ if (n < 0) {
+ n = -n;
}
- Map<String, Integer> map = new TreeMap<String, Integer>();
- map.put("stores", stores);
- map.put("storefiles", storefiles);
- map.put("storefileSizeMB", storefileIndexSizeMB);
- map.put("memstoreSizeMB", memstoreSizeMB);
- StringWriter w = new StringWriter();
- mapper.writeValue(w, map);
- w.close();
- return Bytes.toBytes(w.toString());
+ return n;
}
+ // Start Client methods
+
/**
- * For tests and web ui.
- * This method will only work if HRegionServer is in the same JVM as client;
- * HRegion cannot be serialized to cross an rpc.
- * @see #getOnlineRegions()
+ * Get data from a table.
+ *
+ * @param controller the RPC controller
+ * @param request the get request
+ * @throws ServiceException
*/
- public Collection<HRegion> getOnlineRegionsLocalContext() {
- Collection<HRegion> regions = this.onlineRegions.values();
- return Collections.unmodifiableCollection(regions);
- }
-
@Override
- public void addToOnlineRegions(HRegion region) {
- this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
+ public GetResponse get(final RpcController controller,
+ final GetRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ GetResponse.Builder builder = GetResponse.newBuilder();
+ ClientProtos.Get get = request.getGet();
+ Boolean existence = null;
+ Result r = null;
+ if (request.getClosestRowBefore()) {
+ if (get.getColumnCount() != 1) {
+ throw new DoNotRetryIOException(
+ "get ClosestRowBefore supports one and only one family now, not "
+ + get.getColumnCount() + " families");
+ }
+ byte[] row = get.getRow().toByteArray();
+ byte[] family = get.getColumn(0).getFamily().toByteArray();
+ r = region.getClosestRowBefore(row, family);
+ } else {
+ Get clientGet = ProtobufUtil.toGet(get);
+ if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
+ existence = region.getCoprocessorHost().preExists(clientGet);
+ }
+ if (existence == null) {
+ Integer lock = getLockFromId(clientGet.getLockId());
+ r = region.get(clientGet, lock);
+ if (request.getExistenceOnly()) {
+ boolean exists = r != null && !r.isEmpty();
+ if (region.getCoprocessorHost() != null) {
+ exists = region.getCoprocessorHost().postExists(clientGet, exists);
+ }
+ existence = Boolean.valueOf(exists);
+ }
+ }
+ }
+ if (existence != null) {
+ builder.setExists(existence.booleanValue());
+ } else if (r != null) {
+ builder.setResult(ProtobufUtil.toResult(r));
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
}
+ /**
+ * Mutate data in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the mutate request
+ * @throws ServiceException
+ */
@Override
- public boolean removeFromOnlineRegions(final String encodedName) {
- HRegion toReturn = null;
- toReturn = this.onlineRegions.remove(encodedName);
-
- //Clear all of the dynamic metrics as they are now probably useless.
- //This is a clear because dynamic metrics could include metrics per cf and
- //per hfile. Figuring out which cfs, hfiles, and regions are still relevant to
- //this region server would be an onerous task. Instead just clear everything
- //and on the next tick of the metrics everything that is still relevant will be
- //re-added.
- this.dynamicMetrics.clear();
- return toReturn != null;
+ public MutateResponse mutate(final RpcController controller,
+ final MutateRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ MutateResponse.Builder builder = MutateResponse.newBuilder();
+ Mutate mutate = request.getMutate();
+ if (!region.getRegionInfo().isMetaTable()) {
+ cacheFlusher.reclaimMemStoreMemory();
+ }
+ Integer lock = null;
+ Result r = null;
+ Boolean processed = null;
+ MutateType type = mutate.getMutateType();
+ switch (type) {
+ case APPEND:
+ r = append(region, mutate);
+ break;
+ case INCREMENT:
+ r = increment(region, mutate);
+ break;
+ case PUT:
+ Put put = ProtobufUtil.toPut(mutate);
+ lock = getLockFromId(put.getLockId());
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+ WritableByteArrayComparable comparator =
+ (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+ if (region.getCoprocessorHost() != null) {
+ processed = region.getCoprocessorHost().preCheckAndPut(
+ row, family, qualifier, compareOp, comparator, put);
+ }
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, compareOp, comparator, put, lock, true);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndPut(row, family,
+ qualifier, compareOp, comparator, put, result);
+ }
+ processed = Boolean.valueOf(result);
+ }
+ } else {
+ region.put(put, lock);
+ processed = Boolean.TRUE;
+ }
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(mutate);
+ lock = getLockFromId(delete.getLockId());
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+ WritableByteArrayComparable comparator =
+ (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+ if (region.getCoprocessorHost() != null) {
+ processed = region.getCoprocessorHost().preCheckAndDelete(
+ row, family, qualifier, compareOp, comparator, delete);
+ }
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, compareOp, comparator, delete, lock, true);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+ qualifier, compareOp, comparator, delete, result);
+ }
+ processed = Boolean.valueOf(result);
+ }
+ } else {
+ region.delete(delete, lock, delete.getWriteToWAL());
+ processed = Boolean.TRUE;
+ }
+ break;
+ default:
+ throw new DoNotRetryIOException(
+ "Unsupported mutate type: " + type.name());
+ }
+ if (processed != null) {
+ builder.setProcessed(processed.booleanValue());
+ } else if (r != null) {
+ builder.setResult(ProtobufUtil.toResult(r));
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ checkFileSystem();
+ throw new ServiceException(ie);
+ }
}
+ //
+ // remote scanner interface
+ //
+
/**
- * @return A new Map of online regions sorted by region size with the first
- * entry being the biggest.
+ * Scan data in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the scan request
+ * @throws ServiceException
*/
- public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
- // we'll sort the regions in reverse
- SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
- new Comparator<Long>() {
- public int compare(Long a, Long b) {
- return -1 * a.compareTo(b);
+ @Override
+ public ScanResponse scan(final RpcController controller,
+ final ScanRequest request) throws ServiceException {
+ Leases.Lease lease = null;
+ String scannerName = null;
+ try {
+ if (!request.hasScannerId() && !request.hasScan()) {
+ throw new DoNotRetryIOException(
+ "Missing required input: scannerId or scan");
+ }
+ long scannerId = -1;
+ if (request.hasScannerId()) {
+ scannerId = request.getScannerId();
+ scannerName = String.valueOf(scannerId);
+ }
+ try {
+ checkOpen();
+ } catch (IOException e) {
+ // If checkOpen failed, server not running or filesystem gone,
+ // cancel this lease; filesystem is gone or we're closing or something.
+ if (scannerName != null) {
+ try {
+ leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ LOG.info("Server shutting down and client tried to access missing scanner " +
+ scannerName);
}
- });
- // Copy over all regions. Regions are sorted by size with biggest first.
- for (HRegion region : this.onlineRegions.values()) {
- sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
- }
- return sortedRegions;
- }
+ }
+ throw e;
+ }
+ requestCount.incrementAndGet();
+
+ try {
+ int ttl = 0;
+ HRegion region = null;
+ RegionScanner scanner = null;
+ boolean moreResults = true;
+ boolean closeScanner = false;
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ if (request.hasCloseScanner()) {
+ closeScanner = request.getCloseScanner();
+ }
+ int rows = 1;
+ if (request.hasNumberOfRows()) {
+ rows = request.getNumberOfRows();
+ }
+ if (request.hasScannerId()) {
+ scanner = scanners.get(scannerName);
+ if (scanner == null) {
+ throw new UnknownScannerException(
+ "Name: " + scannerName + ", already closed?");
+ }
+ region = getRegion(scanner.getRegionInfo().getRegionName());
+ } else {
+ region = getRegion(request.getRegion());
+ ClientProtos.Scan protoScan = request.getScan();
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ region.prepareScanner(scan);
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().preScannerOpen(scan);
+ }
+ if (scanner == null) {
+ scanner = region.getScanner(scan);
+ }
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+ }
+ scannerId = addScanner(scanner);
+ scannerName = String.valueOf(scannerId);
+ ttl = leases.leasePeriod;
+ }
- /** @return the request count */
- public AtomicInteger getRequestCount() {
- return this.requestCount;
- }
+ if (rows > 0) {
+ try {
+ // Remove lease while its being processed in server; protects against case
+ // where processing of request takes > lease expiration time.
+ lease = leases.removeLease(scannerName);
+ List<Result> results = new ArrayList<Result>(rows);
+ long currentScanResultSize = 0;
+
+ boolean done = false;
+ // Call coprocessor. Get region info from scanner.
+ if (region != null && region.getCoprocessorHost() != null) {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(
+ scanner, results, rows);
+ if (!results.isEmpty()) {
+ for (Result r : results) {
+ for (KeyValue kv : r.raw()) {
+ currentScanResultSize += kv.heapSize();
+ }
+ }
+ }
+ if (bypass != null && bypass.booleanValue()) {
+ done = true;
+ }
+ }
- /**
- * @return time stamp in millis of when this region server was started
- */
- public long getStartcode() {
- return this.startcode;
- }
+ if (!done) {
+ long maxResultSize = scanner.getMaxResultSize();
+ if (maxResultSize <= 0) {
+ maxResultSize = maxScannerResultSize;
+ }
+ List<KeyValue> values = new ArrayList<KeyValue>();
+ for (int i = 0; i < rows
+ && currentScanResultSize < maxResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = scanner.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
+ }
- /** @return reference to FlushRequester */
- public FlushRequester getFlushRequester() {
- return this.cacheFlusher;
+ // coprocessor postNext hook
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+ }
+ }
+
+ // If the scanner's filter - if any - is done with the scan
+ // and wants to tell the client to stop the scan. This is done by passing
+ // a null result, and setting moreResults to false.
+ if (scanner.isFilterDone() && results.isEmpty()) {
+ moreResults = false;
+ results = null;
+ } else {
+ for (Result result: results) {
+ if (result != null) {
+ builder.addResult(ProtobufUtil.toResult(result));
+ }
+ }
+ }
+ } finally {
+ // We're done. On way out re-add the above removed lease.
+ // Adding resets expiration time on lease.
+ if (scanners.containsKey(scannerName)) {
+ if (lease != null) leases.addLease(lease);
+ ttl = leases.leasePeriod;
+ }
+ }
+ }
+
+ if (!moreResults || closeScanner) {
+ ttl = 0;
+ moreResults = false;
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return builder.build(); // bypass
+ }
+ }
+ scanner = scanners.remove(scannerName);
+ if (scanner != null) {
+ scanner.close();
+ leases.cancelLease(scannerName);
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ }
+
+ if (ttl > 0) {
+ builder.setTtl(ttl);
+ }
+ builder.setScannerId(scannerId);
+ builder.setMoreResults(moreResults);
+ return builder.build();
+ } catch (Throwable t) {
+ if (scannerName != null &&
+ t instanceof NotServingRegionException) {
+ scanners.remove(scannerName);
+ }
+ throw convertThrowableToIOE(cleanup(t));
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
}
/**
- * Get the top N most loaded regions this server is serving so we can tell the
- * master which regions it can reallocate if we're overloaded. TODO: actually
- * calculate which regions are most loaded. (Right now, we're just grabbing
- * the first N regions being served regardless of load.)
+ * Lock a row in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the lock row request
+ * @throws ServiceException
*/
- protected HRegionInfo[] getMostLoadedRegions() {
- ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
- for (HRegion r : onlineRegions.values()) {
- if (!r.isAvailable()) {
- continue;
+ @Override
+ public LockRowResponse lockRow(final RpcController controller,
+ final LockRowRequest request) throws ServiceException {
+ try {
+ if (request.getRowCount() != 1) {
+ throw new DoNotRetryIOException(
+ "lockRow supports only one row now, not " + request.getRowCount() + " rows");
}
- if (regions.size() < numRegionsToReport) {
- regions.add(r.getRegionInfo());
- } else {
- break;
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ byte[] row = request.getRow(0).toByteArray();
+ try {
+ Integer r = region.obtainRowLock(row);
+ long lockId = addRowLock(r, region);
+ LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+ LockRowResponse.Builder builder = LockRowResponse.newBuilder();
+ builder.setLockId(lockId);
+ return builder.build();
+ } catch (Throwable t) {
+ throw convertThrowableToIOE(cleanup(t,
+ "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
}
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
}
- return regions.toArray(new HRegionInfo[regions.size()]);
}
+ /**
+ * Unlock a locked row in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the unlock row request
+ * @throws ServiceException
+ */
@Override
@QosPriority(priority=HIGH_QOS)
- public ProtocolSignature getProtocolSignature(
- String protocol, long version, int clientMethodsHashCode)
- throws IOException {
- if (protocol.equals(HRegionInterface.class.getName())) {
- return new ProtocolSignature(HRegionInterface.VERSION, null);
- } else if (protocol.equals(ClientProtocol.class.getName())) {
[... 1144 lines stripped ...]