You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC
svn commit: r1576909 [10/18] - in /hbase/branches/0.89-fb/src: ./
examples/thrift/ main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/avro/
main/java/org/apache/hadoop/hbase/avro/generated/
main/java/org/apache/hadoop/hbase/client/ ma...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactUtility.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactUtility.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactUtility.java Wed Mar 12 21:17:13 2014
@@ -11,26 +11,30 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import org.apache.commons.cli.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
/**
* A compaction Utility for the Store files of a region Server.
* This utility exposes a hook to compact specific store files.
@@ -75,9 +79,6 @@ public class CompactUtility {
Store store = hRegion.getStore(this.cFamily.getName());
List<StoreFile> storeFiles = new ArrayList<StoreFile>(
store.getStorefiles());
- if (storeFiles == null) {
- throw new Exception("No Store Files To Compact");
- }
if (filesCompacting != null) {
for (Iterator<StoreFile> sFile = storeFiles.iterator(); sFile.hasNext(); ) {
if (!filesCompacting.contains(sFile.next().getPath())) {
@@ -134,7 +135,7 @@ public class CompactUtility {
HColumnDescriptor cFamily = new HColumnDescriptor(cmd.getOptionValue("c"));
String[] files = cmd.getArgs();
- List<Path> filePaths = new ArrayList<Path>();
+ List<Path> filePaths = new ArrayList<Path>(files.length);
for (String file : files) {
filePaths.add(new Path(file));
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java Wed Mar 12 21:17:13 2014
@@ -20,8 +20,13 @@
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.Random;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -29,12 +34,8 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.Random;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
public class CompactionManager {
@@ -108,11 +109,12 @@ public class CompactionManager {
*/
private CompactSelection selectExpiredSFs
(CompactSelection candidates, long maxExpiredTimeStamp) {
- if (candidates.filesToCompact == null || candidates.filesToCompact.size() == 0)
+ if (candidates.filesToCompact == null
+ || candidates.filesToCompact.isEmpty()) {
return null;
+ }
+
ArrayList<StoreFile> expiredStoreFiles = null;
- boolean hasExpiredStoreFiles = false;
- CompactSelection expiredSFSelection = null;
for (StoreFile storeFile : candidates.filesToCompact) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
@@ -120,18 +122,18 @@ public class CompactionManager {
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
- if (!hasExpiredStoreFiles) {
+ if (expiredStoreFiles == null) {
expiredStoreFiles = new ArrayList<StoreFile>();
- hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
- if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(expiredStoreFiles);
+ if (expiredStoreFiles == null) {
+ return null;
}
- return expiredSFSelection;
+
+ return new CompactSelection(expiredStoreFiles);
}
/**
@@ -244,12 +246,13 @@ public class CompactionManager {
int start = 0;
double r = comConf.getCompactionRatio();
- if (isOffPeakHour() && !(candidates.getNumOutStandingOffPeakCompactions() > 0)) {
+ if (isOffPeakHour()
+ && !(CompactSelection.getNumOutStandingOffPeakCompactions() > 0)) {
r = comConf.getCompactionRatioOffPeak();
candidates.setOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + r
+ ", numOutstandingOffPeakCompactions is now "
- + candidates.getNumOutStandingOffPeakCompactions());
+ + CompactSelection.getNumOutStandingOffPeakCompactions());
}
// get store file sizes for incremental compacting selection.
@@ -396,9 +399,4 @@ public class CompactionManager {
}
return (currentHour >= startHour || currentHour < endHour);
}
-
- private boolean isValidHour(int hour) {
- return (hour >= 0 && hour <= 23);
- }
-
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Wed Mar 12 21:17:13 2014
@@ -80,10 +80,12 @@ public class ExplicitColumnTracker imple
/**
* Done when there are no more columns to match against.
*/
+ @Override
public boolean done() {
- return this.columns.size() == 0;
+ return this.columns.isEmpty();
}
+ @Override
public ColumnCount getColumnHint() {
return this.column;
}
@@ -100,11 +102,12 @@ public class ExplicitColumnTracker imple
* scanners' read points.)
* @return MatchCode telling ScanQueryMatcher what action to take
*/
+ @Override
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp, boolean ignoreCount) {
do {
// No more columns left, we are done with this query
- if(this.columns.size() == 0) {
+ if (this.columns.isEmpty()) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
}
@@ -179,6 +182,7 @@ public class ExplicitColumnTracker imple
/**
* Called at the end of every StoreFile or memstore.
*/
+ @Override
public void update() {
if(this.columns.size() != 0) {
this.index = 0;
@@ -190,6 +194,7 @@ public class ExplicitColumnTracker imple
}
// Called between every row.
+ @Override
public void reset() {
buildColumnList();
this.index = 0;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Mar 12 21:17:13 2014
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 The Apache Software Foundation
+ * Copyright 2014 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -26,11 +26,13 @@ import java.io.UnsupportedEncodingExcept
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -53,8 +55,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.Date;
-import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,20 +72,23 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.L2Cache;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
+import org.apache.hadoop.hbase.io.hfile.histogram.HistogramUtils;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -143,7 +146,7 @@ import com.google.common.collect.Lists;
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
-public class HRegion implements HeapSize {
+public class HRegion implements HeapSize, ConfigurationObserver {
public static final Log LOG = LogFactory.getLog(HRegion.class);
static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
@@ -585,6 +588,8 @@ public class HRegion implements HeapSize
*/
public long initialize(final Progressable reporter)
throws IOException {
+ HRegionServer.configurationManager.registerObserver(this);
+
MonitoredTask status = TaskMonitor.get().createStatus(
"Initializing region " + this);
try {
@@ -842,6 +847,8 @@ public class HRegion implements HeapSize
* @throws IOException e
*/
public List<StoreFile> close(final boolean abort) throws IOException {
+ HRegionServer.configurationManager.deregisterObserver(this);
+
MonitoredTask status = TaskMonitor.get().createStatus(
"Closing region " + this + (abort ? " due to abort" : ""));
if (isClosed()) {
@@ -1411,7 +1418,7 @@ public class HRegion implements HeapSize
}
}
// Didn't find any CFs which were above the threshold for selection.
- if (specificStoresToFlush.size() == 0) {
+ if (specificStoresToFlush.isEmpty()) {
LOG.debug("Since none of the CFs were above the size, flushing all.");
specificStoresToFlush = stores.values();
}
@@ -1783,8 +1790,7 @@ public class HRegion implements HeapSize
if (key == null) {
return null;
}
- Get get = new Get(key.getRow());
- get.addFamily(family);
+ Get get = new Get.Builder(key.getRow()).addFamily(family).create();
return get(get, null);
} finally {
splitsAndClosesLock.readLock().unlock();
@@ -1936,9 +1942,8 @@ public class HRegion implements HeapSize
}
count = kvCount.get(qual);
- Get get = new Get(kv.getRow());
- get.setMaxVersions(count);
- get.addColumn(family, qual);
+ Get get = new Get.Builder(kv.getRow()).setMaxVersions(count)
+ .addColumn(family, qual).create();
List<KeyValue> result = get(get);
@@ -2344,12 +2349,12 @@ public class HRegion implements HeapSize
get.addColumn(family, qualifier);
// Lock row
Integer lid = getLock(lockId, get.getRow(), true);
- List<KeyValue> result = new ArrayList<KeyValue>();
+ List<KeyValue> result = new ArrayList<KeyValue>(1);
try {
result = get(get);
boolean matches = false;
- if (result.size() == 0) {
+ if (result.isEmpty()) {
if (expectedValue == null ) {
matches = true;
}
@@ -3731,6 +3736,10 @@ public class HRegion implements HeapSize
byte[] byteNow = Bytes.toBytes(now);
try {
// 5. Check mutations and apply edits to a single WALEdit
+ if (rm instanceof TRowMutations) {
+ rm = RowMutations.Builder.createFromTRowMutations((TRowMutations)rm);
+ }
+
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
@@ -3818,8 +3827,7 @@ public class HRegion implements HeapSize
Store store = stores.get(family);
// Get the old value:
- Get get = new Get(row);
- get.addColumn(family, qualifier);
+ Get get = new Get.Builder(row).addColumn(family, qualifier).create();
List<KeyValue> results = get(get);
@@ -4083,21 +4091,13 @@ public class HRegion implements HeapSize
};
/**
- * Returns null is no data is available.
- * @return
- * @throws IOException
+ * @return null is no data is available.
*/
public HFileHistogram getHistogram() throws IOException {
- List<HFileHistogram> histograms = new ArrayList<HFileHistogram>();
- if (stores.size() == 0) return null;
- for (Store s : stores.values()) {
- HFileHistogram hist = s.getHistogram();
- if (hist != null) histograms.add(hist);
+ if (stores.isEmpty()) {
+ return null;
}
- // If none of the stores produce a histogram returns null.
- if (histograms.size() == 0) return null;
- HFileHistogram h = histograms.get(0).compose(histograms);
- return h;
+ return HistogramUtils.mergeOfStores(stores.values());
}
/**
@@ -4150,5 +4150,4 @@ public class HRegion implements HeapSize
s.updateConfiguration();
}
}
-
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Mar 12 21:17:13 2014
@@ -19,7 +19,49 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Constructor;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
@@ -31,11 +73,41 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HMsg.Type;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownRowLockException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiPut;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiResponse;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -45,8 +117,16 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
-import org.apache.hadoop.hbase.ipc.*;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
import org.apache.hadoop.hbase.master.AssignmentPlan;
import org.apache.hadoop.hbase.master.RegionPlacement;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
@@ -54,7 +134,21 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.thrift.HBaseNiftyThriftServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CompoundBloomFilter;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ParamFormat;
+import org.apache.hadoop.hbase.util.ParamFormatter;
+import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -71,23 +165,7 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryUsage;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Constructor;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Preconditions;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -105,7 +183,7 @@ public class HRegionServer implements HR
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
private static final String UNABLE_TO_READ_MASTER_ADDRESS_ERR_MSG =
"Unable to read master address from ZooKeeper";
- private static final ArrayList<Put> emptyPutArray = new ArrayList<Put>();
+ private static final ArrayList<Put> emptyPutArray = new ArrayList<Put>(0);
private static final int DEFAULT_NUM_TRACKED_CLOSED_REGION = 3;
private static SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -141,7 +219,12 @@ public class HRegionServer implements HR
// If false, the file system has become unavailable
protected volatile boolean fsOk;
+ // TODO gauravm
+ // When both Hadoop RPC and Thrift RPC are switched on, what do we do here?
protected volatile HServerInfo serverInfo;
+ // HServerInfo for the RPC Server
+ protected volatile HServerInfo rpcServerInfo;
+
protected final Configuration conf;
private final ServerConnection connection;
@@ -267,6 +350,9 @@ public class HRegionServer implements HR
// reference to the Thrift Server.
volatile private HRegionThriftServer thriftServer;
+ // The nifty thrift server
+ volatile private HBaseNiftyThriftServer niftyThriftServer;
+
// Cache configuration and block cache reference
private final CacheConfig cacheConfig;
@@ -314,6 +400,8 @@ public class HRegionServer implements HR
// Configuration changes.
public static final ConfigurationManager configurationManager =
new ConfigurationManager();
+ private boolean useThrift;
+ private boolean useHadoopRPC;
public static long getResponseSizeLimit() {
return responseSizeLimit;
@@ -328,7 +416,10 @@ public class HRegionServer implements HR
public static volatile long openRegionDelay = 0;
/**
- * Starts a HRegionServer at the default location
+ * Starts a HRegionServer at the default location. This should be followed
+ * by a call to the initialize() method on the HRegionServer object, to start
+ * up the RPC servers.
+ *
* @param conf
* @throws IOException
*/
@@ -382,7 +473,6 @@ public class HRegionServer implements HR
HRegionServer.useSeekNextUsingHint =
conf.getBoolean("hbase.regionserver.scan.timestampfilter.allow_seek_next_using_hint", true);
- reinitialize();
SchemaMetrics.configureGlobally(conf);
cacheConfig = new CacheConfig(conf);
configurationManager.registerObserver(cacheConfig);
@@ -413,6 +503,7 @@ public class HRegionServer implements HR
new ThreadFactory() {
private int count = 1;
+ @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "regionOpenCloseThread-" + count++);
t.setDaemon(true);
@@ -431,10 +522,11 @@ public class HRegionServer implements HR
/**
* Creates all of the state that needs to be reconstructed in case we are
* doing a restart. This is shared between the constructor and restart().
- * Both call it.
+ * Both call it. Initialize must be called outside the constructor since the
+ * regionserver object would be unpublished at that point.
* @throws IOException
*/
- private void reinitialize() throws IOException {
+ public void initialize() throws IOException {
this.restartRequested = false;
this.abortRequested = false;
this.stopRequestedAtStageOne.set(false);
@@ -442,16 +534,52 @@ public class HRegionServer implements HR
// Address is giving a default IP for the moment. Will be changed after
// calling the master.
- int port;
- if ((port = address.getPort()) == 0) {
- // start the RPC server to get the actual ephemeral port value
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(),
- conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
- this.server.setErrorHandler(this);
- port = this.server.getListenerAddress().getPort();
+ int port = 0;
+ useThrift =
+ conf.getBoolean(HConstants.REGIONSERVER_USE_THRIFT,
+ HConstants.DEFAULT_REGIONSERVER_USE_THRIFT);
+ useHadoopRPC =
+ conf.getBoolean(HConstants.REGIONSERVER_USE_HADOOP_RPC,
+ HConstants.DEFAULT_REGIONSERVER_USE_HADOOP_RPC);
+ if (this.useHadoopRPC) {
+ if ((port = address.getPort()) == 0) {
+ // start the RPC server to get the actual ephemeral port value
+ this.server = HBaseRPC.getServer(this, address.getBindAddress(),
+ address.getPort(),
+ conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
+ this.server.setErrorHandler(this);
+ port = this.server.getListenerAddress().getPort();
+ }
+
+ this.rpcServerInfo = new HServerInfo(new HServerAddress(
+ new InetSocketAddress(address.getBindAddress(), port)),
+ System.currentTimeMillis(), machineName);
+ }
+ if (useThrift) {
+ int niftyServerPort =
+ conf.getInt(HConstants.REGIONSERVER_SWIFT_PORT,
+ HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT);
+ Class<? extends ThriftHRegionServer> thriftServerClass =
+ (Class<? extends ThriftHRegionServer>)
+ conf.getClass(HConstants.THRIFT_REGION_SERVER_IMPL, ThriftHRegionServer.class);
+
+ ThriftHRegionServer thriftServer;
+ try {
+ thriftServer = thriftServerClass.getConstructor(HRegionServer.class).newInstance(this);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ niftyThriftServer = new HBaseNiftyThriftServer(this.conf, thriftServer, niftyServerPort);
+
+ if (conf.getBoolean(
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT)) {
+ port = niftyThriftServer.getPort();
+ }
+ isRpcServerRunning = true;
}
+
this.serverInfo = new HServerInfo(new HServerAddress(
new InetSocketAddress(address.getBindAddress(), port)),
System.currentTimeMillis(), machineName);
@@ -470,6 +598,18 @@ public class HRegionServer implements HR
}
}
+ public int getHadoopRPCServerPort() {
+ return this.serverInfo.getServerAddress().getPort();
+ }
+
+ public int getThriftServerPort() {
+ return this.niftyThriftServer.getPort();
+ }
+
+ public HBaseRpcMetrics getThriftMetrics() {
+ return niftyThriftServer.getRpcMetrics();
+ }
+
private void initializeZooKeeper() throws IOException {
boolean abortProcesstIfZKExpired = conf.getBoolean(
HConstants.ZOOKEEPER_SESSION_EXPIRED_ABORT_PROCESS, true);
@@ -587,14 +727,12 @@ public class HRegionServer implements HR
}
/**
- * if the current call from a client has closed his connection
- * @return
+ * TODO: adela task is created to enable this feature on swift already
+ * t2931033
+ *
+ * @return false
*/
public static boolean isCurrentConnectionClosed() {
- // this is checked just because of the unit tests
- if (callContext.get() != null) {
- return callContext.get().getConnection().getSocket().isClosed();
- }
return false;
}
@@ -708,7 +846,7 @@ public class HRegionServer implements HR
lastMsg = System.currentTimeMillis();
updateOutboundMsgs(outboundMessages);
outboundMessages.clear();
- if (this.quiesced.get() && onlineRegions.size() == 0) {
+ if (this.quiesced.get() && onlineRegions.isEmpty()) {
// We've just told the master we're exiting because we aren't
// serving any regions. So set the stop bit and exit.
LOG.info("Server quiesced and not serving any regions. " +
@@ -987,6 +1125,9 @@ public class HRegionServer implements HR
if (this.server != null) {
this.server.stop();
}
+ if (this.niftyThriftServer != null) {
+ this.niftyThriftServer.stop();
+ }
if (this.splitLogWorkers != null) {
for(SplitLogWorker splitLogWorker: splitLogWorkers) {
splitLogWorker.stop();
@@ -1611,7 +1752,7 @@ public class HRegionServer implements HR
long filesRead = 0;
long cntWriteException = 0;
long cntReadException = 0;
-
+
for (Statistics fsStatistic : FileSystem.getAllStatistics()) {
bytesRead += fsStatistic.getBytesRead();
bytesLocalRead += fsStatistic.getLocalBytesRead();
@@ -1622,7 +1763,7 @@ public class HRegionServer implements HR
cntWriteException += fsStatistic.getCntWriteException();
cntReadException += fsStatistic.getCntReadException();
}
-
+
this.metrics.bytesRead.set(bytesRead);
this.metrics.bytesLocalRead.set(bytesLocalRead);
this.metrics.bytesRackLocalRead.set(bytesRackLocalRead);
@@ -1674,12 +1815,12 @@ public class HRegionServer implements HR
}
};
this.cacheFlusher.start(n, handler);
-
+
// Initialize the hlog roller threads
- for (int i = 0; i < this.hlogRollers.length; i++) {
- Threads.setDaemonThreadRunning(this.hlogRollers[i], n + ".logRoller-" + i, handler);
+ for (int i = 0; i < this.hlogRollers.length; i++) {
+ Threads.setDaemonThreadRunning(this.hlogRollers[i], n + ".logRoller-" + i, handler);
}
-
+
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
Threads.setDaemonThreadRunning(this.majorCompactionChecker,
n + ".majorCompactionChecker", handler);
@@ -1716,17 +1857,19 @@ public class HRegionServer implements HR
}
}
- if (this.server == null) {
- // Start Server to handle client requests.
- this.server = HBaseRPC.getServer(this,
- serverInfo.getServerAddress().getBindAddress(),
- serverInfo.getServerAddress().getPort(),
- conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
- this.server.setErrorHandler(this);
+ if (this.useHadoopRPC) {
+ if (this.server == null) {
+ // Start Server to handle client requests.
+ this.server = HBaseRPC.getServer(this,
+ rpcServerInfo.getServerAddress().getBindAddress(),
+ rpcServerInfo.getServerAddress().getPort(),
+ conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
+ this.server.setErrorHandler(this);
+ }
+ this.server.start();
+ isRpcServerRunning = true;
}
- this.server.start();
- isRpcServerRunning = true;
int numSplitLogWorkers = conf.getInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, 3);
// Create the log splitting worker and start it
this.splitLogWorkers = new ArrayList<SplitLogWorker>(numSplitLogWorkers);
@@ -1739,8 +1882,8 @@ public class HRegionServer implements HR
}
// start the scanner prefetch threadpool
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
- scanPrefetchThreadPool =
- Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
+ scanPrefetchThreadPool =
+ Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
new DaemonThreadFactory("scan-prefetch-"));
LOG.info("HRegionServer started at: " +
@@ -1764,13 +1907,13 @@ public class HRegionServer implements HR
}
return true;
}
-
- private boolean isAllHLogRollerAlive() {
- boolean res = true;
- for (int i = 0; i < this.hlogRollers.length; i++) {
- res = res && this.hlogRollers[i].isAlive();
- }
- return res;
+
+ private boolean isAllHLogRollerAlive() {
+ boolean res = true;
+ for (int i = 0; i < this.hlogRollers.length; i++) {
+ res = res && this.hlogRollers[i].isAlive();
+ }
+ return res;
}
/*
@@ -1804,7 +1947,7 @@ public class HRegionServer implements HR
@Override
public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException {
List <String> allHLogsList = new ArrayList<String>();
-
+
for (int i = 0; i < hlogs.length; i++) {
if (rollCurrentHLog) {
this.hlogs[i].rollWriter();
@@ -1814,7 +1957,7 @@ public class HRegionServer implements HR
return allHLogsList;
}
-
+
/**
* Return the i th HLog in this region server
*/
@@ -1825,7 +1968,7 @@ public class HRegionServer implements HR
public int getTotalHLogCnt() {
return this.hlogs.length;
}
-
+
/**
* Sets a flag that will cause all the HRegionServer threads to shut down
* in an orderly fashion. Used by unit tests.
@@ -1907,9 +2050,9 @@ public class HRegionServer implements HR
Threads.shutdown(this.majorCompactionChecker);
Threads.shutdown(this.workerThread);
this.cacheFlusher.join();
- for (int i = 0; i < this.hlogRollers.length; i++) {
- Threads.shutdown(this.hlogRollers[i]);
- }
+ for (int i = 0; i < this.hlogRollers.length; i++) {
+ Threads.shutdown(this.hlogRollers[i]);
+ }
this.compactSplitThread.join();
}
@@ -1932,10 +2075,20 @@ public class HRegionServer implements HR
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
- master = (HMasterRegionInterface)HBaseRPC.getProxy(
- HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
- masterAddress.getInetSocketAddress(), this.conf, this.rpcTimeoutToMaster,
- HBaseRPCOptions.DEFAULT);
+ if (this.conf.getBoolean(HConstants.CLIENT_TO_MASTER_USE_THRIFT,
+ HConstants.CLIENT_TO_MASTER_USE_THRIFT_DEFAULT)) {
+ InetSocketAddress addr = new InetSocketAddress(masterAddress
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.MASTER_THRIFT_PORT,
+ HConstants.MASTER_THRIFT_PORT_DEFAULT));
+ master = (HMasterRegionInterface) HBaseThriftRPC.getClient(addr,
+ this.conf, HMasterRegionInterface.class, HBaseRPCOptions.DEFAULT);
+ } else {
+ master = (HMasterRegionInterface) HBaseRPC.getProxy(
+ HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
+ masterAddress.getInetSocketAddress(), this.conf,
+ this.rpcTimeoutToMaster, HBaseRPCOptions.DEFAULT);
+ }
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();
@@ -2197,7 +2350,7 @@ public class HRegionServer implements HR
if (region == null) {
try {
zkUpdater.startRegionOpenEvent(null, true);
-
+
// Assign one of the HLogs to the new opening region.
// If the region has been opened before, assign the previous HLog instance to that region.
Integer hLogIndex = null;
@@ -2206,12 +2359,12 @@ public class HRegionServer implements HR
this.regionNameToHLogIDMap.put(regionInfo.getRegionNameAsString(), hLogIndex);
}
region = instantiateRegion(regionInfo, this.hlogs[hLogIndex.intValue()]);
- LOG.info("Initiate the region: " + regionInfo.getRegionNameAsString() + " with HLog #" +
+ LOG.info("Initiate the region: " + regionInfo.getRegionNameAsString() + " with HLog #" +
hLogIndex);
-
+
// Set up the favorite nodes for all the HFile for that region
setFavoredNodes(region, favoredNodes);
-
+
// Startup a compaction early if one is needed, if store has references
// or has too many store files
for (Store s : region.getStores().values()) {
@@ -2395,10 +2548,10 @@ public class HRegionServer implements HR
/** Called either when the master tells us to restart or from stop()
* @throws Throwable */
ArrayList<HRegion> closeAllRegions() {
- ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
+ ArrayList<HRegion> regionsToClose = null;
this.lock.writeLock().lock();
try {
- regionsToClose.addAll(onlineRegions.values());
+ regionsToClose = new ArrayList<HRegion>(onlineRegions.values());
} finally {
this.lock.writeLock().unlock();
}
@@ -2426,7 +2579,8 @@ public class HRegionServer implements HR
createRegionCloseCallable(regionsToClose.get(i))));
}
- ArrayList<HRegion> regionsClosed = new ArrayList<HRegion>();
+ ArrayList<HRegion> regionsClosed = new ArrayList<HRegion>(
+ regionsToClose.size());
for (int i = 0; i < futures.size(); i++ ) {
Future<Object> future = futures.get(i);
try {
@@ -2445,6 +2599,7 @@ public class HRegionServer implements HR
private Callable<Object> createRegionOpenCallable(final HRegionInfo rinfo,
final String favouredNodes) {
return new Callable<Object>() {
+ @Override
public Object call() throws IOException {
openRegion(rinfo, favouredNodes);
return null;
@@ -2454,6 +2609,7 @@ public class HRegionServer implements HR
private Callable<Object> createRegionCloseCallable(final HRegion region) {
return new Callable<Object>() {
+ @Override
public Object call() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
@@ -2492,7 +2648,7 @@ public class HRegionServer implements HR
closeRegionsInParallel(regionsToClose);
this.quiesced.set(true);
- if (onlineRegions.size() == 0) {
+ if (onlineRegions.isEmpty()) {
outboundMsgs.add(REPORT_EXITING);
} else {
outboundMsgs.add(REPORT_QUIESCED);
@@ -2504,9 +2660,9 @@ public class HRegionServer implements HR
//
@Override
- public HRegionInfo getRegionInfo(final byte [] regionName)
- throws NotServingRegionException {
- return getRegion(regionName).getRegionInfo();
+ public HRegionInfo getRegionInfo(final byte[] regionName)
+ throws NotServingRegionException {
+ return getRegion(regionName).getRegionInfo();
}
@@ -2526,13 +2682,18 @@ public class HRegionServer implements HR
}
}
+
/** {@inheritDoc} */
@Override
- public Result get(byte [] regionName, Get get) throws IOException {
+ public Result get(byte[] regionName, Get get) throws IOException {
checkOpen();
try {
HRegion region = getRegion(regionName);
- return region.get(get, getLockFromId(get.getLockId()));
+ Result r = region.get(get, getLockFromId(get.getLockId()));
+ if (r == null) {
+ return Result.SENTINEL_RESULT;
+ }
+ return r;
} catch(Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@@ -2569,8 +2730,8 @@ public class HRegionServer implements HR
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
- for (RowMutations arm: armList) {
- region.mutateRow(arm);
+ for (RowMutations rm: armList) {
+ region.mutateRow(rm);
}
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
@@ -2718,7 +2879,7 @@ public class HRegionServer implements HR
//
@Override
- public long openScanner(byte [] regionName, Scan scan)
+ public long openScanner(final byte [] regionName, final Scan scan)
throws IOException {
checkOpen();
NullPointerException npe = null;
@@ -2757,7 +2918,7 @@ public class HRegionServer implements HR
@ParamFormat(clazz = ScanParamsFormatter.class)
@Override
public Result next(final long scannerId) throws IOException {
- Result [] res = next(scannerId, 1);
+ Result[] res = next(scannerId, 1);
if(res == null || res.length == 0) {
return null;
}
@@ -2793,10 +2954,39 @@ public class HRegionServer implements HR
@ParamFormat(clazz = ScanParamsFormatter.class)
@Override
- public Result [] next(final long scannerId, int nbRows) throws IOException {
+ public Result[] next(final long scannerId, int nbRows) throws IOException {
+ Result[] ret = nextInternal(scannerId, nbRows);
+ if (isScanDone(ret)) {
+ return null;
+ }
+ return ret;
+ }
+
+ /**
+ * Tells whether the scan on this region is complete and that the client
+ * scanner should move onto the next region
+ * @param values
+ * @return
+ */
+ public static boolean isScanDone(Result[] values) {
+ if (values == null) return true;
+ if (values.length == 1) {
+ return values[0].isSentinelResult();
+ }
+ return false;
+ }
+
+ /**
+ * This function results for the next request. This function is used across
+ * Thrift and HadoopRPC.
+ * Termination of scan is represented by {@link Result#SENTINEL_RESULT_ARRAY}
+ * @return
+ */
+ protected Result[] nextInternal(final long scannerId, int nbRows)
+ throws IOException {
try {
String scannerName = String.valueOf(scannerId);
- // HRegionServer only deals with Region Scanner,
+ // HRegionServer only deals with Region Scanner,
// thus, we just typecast directly
RegionScanner s = (RegionScanner)this.scanners.get(scannerName);
if (s == null) {
@@ -2882,7 +3072,8 @@ public class HRegionServer implements HR
}
@Override
- public int delete(final byte[] regionName, final List<Delete> deletes)
+ public int delete(final byte[] regionName,
+ final List<Delete> deletes)
throws IOException {
return applyMutations(regionName, deletes, "multidelete_");
}
@@ -3127,6 +3318,7 @@ public class HRegionServer implements HR
return region.getStoreFileList(columnFamilies);
}
+ @Override
public List<String> getStoreFileList(byte[] regionName)
throws IllegalArgumentException {
HRegion region = getOnlineRegion(regionName);
@@ -3156,7 +3348,8 @@ public class HRegionServer implements HR
/**
* Flushes the given region if lastFlushTime < ifOlderThanTS
*/
- public void flushRegion(byte[] regionName, long ifOlderThanTS)
+ @Override
+ public void flushRegion(byte[] regionName, long ifOlderThanTS)
throws IllegalArgumentException, IOException {
HRegion region = getOnlineRegion(regionName);
if (region == null) {
@@ -3169,6 +3362,7 @@ public class HRegionServer implements HR
/**
* @return the earliest time a store in the given region was flushed.
*/
+ @Override
public long getLastFlushTime(byte[] regionName) {
HRegion region = getOnlineRegion(regionName);
if (region == null) {
@@ -3288,7 +3482,7 @@ public class HRegionServer implements HR
public HRegion getOnlineRegion(final byte [] regionName) {
return onlineRegions.get(Bytes.mapKey(regionName));
}
-
+
/** @return reference to FlushRequester */
public FlushRequester getFlushRequester() {
return this.cacheFlusher;
@@ -3434,7 +3628,15 @@ public class HRegionServer implements HR
/**
* @return Info on port this server has bound to, etc.
*/
- public HServerInfo getServerInfo() { return this.serverInfo; }
+ public HServerInfo getServerInfo() {
+ try {
+ return getHServerInfo();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ }
+ }
/** {@inheritDoc} */
@Override
@@ -3480,8 +3682,8 @@ public class HRegionServer implements HR
public MultiResponse multiAction(MultiAction mActions) throws IOException {
checkOpen();
MultiResponse response = new MultiResponse();
- if (mActions.deletes != null) {
- for (Map.Entry<byte[], List<Delete>> e : mActions.deletes.entrySet()) {
+ if (mActions.getDeletes() != null) {
+ for (Map.Entry<byte[], List<Delete>> e : mActions.getDeletes().entrySet()) {
byte[] regionName = e.getKey();
Object result;
@@ -3495,8 +3697,8 @@ public class HRegionServer implements HR
}
}
- if (mActions.puts != null) {
- for (Map.Entry<byte[], List<Put>> e : mActions.puts.entrySet()) {
+ if (mActions.getPuts() != null) {
+ for (Map.Entry<byte[], List<Put>> e : mActions.getPuts().entrySet()) {
byte[] regionName = e.getKey();
Object result;
@@ -3510,8 +3712,8 @@ public class HRegionServer implements HR
}
}
- if (mActions.gets != null) {
- for (Map.Entry<byte[], List<Get>> e : mActions.gets.entrySet()) {
+ if (mActions.getGets() != null) {
+ for (Map.Entry<byte[], List<Get>> e : mActions.getGets().entrySet()) {
byte[] regionName = e.getKey();
Object result;
@@ -3626,11 +3828,13 @@ public class HRegionServer implements HR
* @return HRegionServer instance.
*/
public static HRegionServer constructRegionServer(Class<? extends HRegionServer> regionServerClass,
- final Configuration conf2) {
+ final Configuration inputConf) {
try {
Constructor<? extends HRegionServer> c =
regionServerClass.getConstructor(Configuration.class);
- return c.newInstance(conf2);
+ HRegionServer server = c.newInstance(inputConf);
+ server.initialize();
+ return server;
} catch (Exception e) {
throw new RuntimeException("Failed construction of " +
"Master: " + regionServerClass.toString(), e);
@@ -3741,6 +3945,7 @@ public class HRegionServer implements HR
}
return counter;
}
+
/**
* @param args
*/
@@ -3786,6 +3991,7 @@ public class HRegionServer implements HR
/**
* Reload the configuration from disk.
*/
+ @Override
public void updateConfiguration() {
LOG.info("Reloading the configuration from disk.");
// Reload the configuration from disk.
@@ -3815,8 +4021,8 @@ public class HRegionServer implements HR
"hbase.regionserver.enable.serverside.profiling", false);
if (origProfiling != newProfiling) {
enableServerSideProfilingForAllCalls.set(newProfiling);
- LOG.info("enableServerSideProfilingForAllCalls changed from " +
- origProfiling + " to " + newProfiling);
+ LOG.info("enableServerSideProfilingForAllCalls changed from "
+ + origProfiling + " to " + newProfiling);
}
if (threads != this.quorumReadThreadsMax) {
@@ -3861,4 +4067,9 @@ public class HRegionServer implements HR
return HRegionUtilities.adjustHistogramBoundariesToRegionBoundaries(
hist.getUniformBuckets(), region.getStartKey(), region.getEndKey());
}
+
+ @Override
+ public void close() throws Exception {
+ // TODO Auto-generated method stub
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Wed Mar 12 21:17:13 2014
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.convertIOException;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -42,9 +44,6 @@ import org.apache.hadoop.hbase.thrift.ge
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
-
-import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.convertIOException;
-
import org.apache.thrift.TException;
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Wed Mar 12 21:17:13 2014
@@ -48,7 +48,7 @@ public interface InternalScanner extends
* @throws IOException e
*/
public boolean next(List<KeyValue> results) throws IOException;
-
+
/**
* Grab the next row's worth of values.
* @param results return output array
@@ -67,7 +67,7 @@ public interface InternalScanner extends
* @throws IOException e
*/
public boolean next(List<KeyValue> result, int limit) throws IOException;
-
+
/**
* Grab the next row's worth of values with a limit on the number of values
* to return.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Wed Mar 12 21:17:13 2014
@@ -20,16 +20,16 @@
package org.apache.hadoop.hbase.regionserver;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.KeyValueContext;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValueContext;
+
/**
* Implements a heap merge across any number of KeyValueScanners.
* <p>
@@ -83,6 +83,7 @@ public class KeyValueHeap extends NonLaz
}
}
+ @Override
public KeyValue peek() {
if (this.current == null) {
return null;
@@ -90,6 +91,7 @@ public class KeyValueHeap extends NonLaz
return this.current.peek();
}
+ @Override
public KeyValue next() throws IOException {
if(this.current == null) {
return null;
@@ -121,6 +123,7 @@ public class KeyValueHeap extends NonLaz
* @param limit limit on row count to get
* @return true if there are more keys, false if all scanners are done
*/
+ @Override
public boolean next(List<KeyValue> result, int limit) throws IOException {
return next(result, limit, null, null);
}
@@ -149,6 +152,7 @@ public class KeyValueHeap extends NonLaz
* @param metric the metric name
* @return true if there are more keys, false if all scanners are done
*/
+ @Override
public boolean next(List<KeyValue> result, int limit, String metric,
KeyValueContext kvContext) throws IOException {
if (this.current == null) {
@@ -186,6 +190,7 @@ public class KeyValueHeap extends NonLaz
* @param result
* @return true if there are more keys, false if all scanners are done
*/
+ @Override
public boolean next(List<KeyValue> result) throws IOException {
return next(result, -1);
}
@@ -204,6 +209,7 @@ public class KeyValueHeap extends NonLaz
public KVScannerComparator(KVComparator kvComparator) {
this.kvComparator = kvComparator;
}
+ @Override
public int compare(KeyValueScanner left, KeyValueScanner right) {
int comparison = compare(left.peek(), right.peek());
if (comparison != 0) {
@@ -239,6 +245,7 @@ public class KeyValueHeap extends NonLaz
}
}
+ @Override
public void close() {
if (this.current != null) {
this.current.close();
@@ -426,7 +433,8 @@ public class KeyValueHeap extends NonLaz
* since it is not included in the heap
*/
List<KeyValueScanner> getActiveScanners() {
- List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
+ List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>(
+ this.heap.size() + 1);
allScanners.addAll(this.heap);
allScanners.add(current);
return allScanners;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Mar 12 21:17:13 2014
@@ -251,7 +251,7 @@ class MemStoreFlusher implements FlushRe
* A flushRegion that checks store file count. If too many, puts the flush
* on delay queue to retry later.
* @param fqe
- * @return true if the region was successfully flushed, false otherwise. If
+ * @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
@@ -452,7 +452,7 @@ class MemStoreFlusher implements FlushRe
public int getRequeueCount() {
return this.requeueCount;
}
-
+
/**
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds System.currentTimeMillis()
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java Wed Mar 12 21:17:13 2014
@@ -49,7 +49,7 @@ public abstract class NonLazyKeyValueSca
"non-lazy scanner");
}
- public static boolean doRealSeek(KeyValueScanner scanner,
+ public static boolean doRealSeek(KeyValueScanner scanner,
KeyValue kv, boolean forward) throws IOException {
return forward ? scanner.reseek(kv) : scanner.seek(kv);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java Wed Mar 12 21:17:13 2014
@@ -29,7 +29,7 @@ public class RegionOverloadedException e
* @return the new exception with complete information
*/
public static RegionOverloadedException create(RegionOverloadedException roe,
- List<Throwable> exceptions, int waitMillis) {
+ List<Throwable> exceptions, long waitMillis) {
StringBuilder sb = new StringBuilder(roe.getMessage());
for (Throwable t : exceptions) {
if (t != roe) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Wed Mar 12 21:17:13 2014
@@ -297,9 +297,14 @@ public class RegionScanner implements In
prefetchScanFuture = scanPrefetchThreadPool.submit(callable);
}
rowReadCnt.addAndGet(scanResult.outResults.length);
- return scanResult.outResults == null ||
- (isFilterDone() && scanResult.outResults.length == 0) ?
- null : scanResult.outResults;
+ Result[] ret;
+ if (scanResult.outResults == null ||
+ (isFilterDone() && scanResult.outResults.length == 0)) {
+ ret = Result.SENTINEL_RESULT_ARRAY;
+ } else {
+ ret = scanResult.outResults;
+ }
+ return ret;
}
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Wed Mar 12 21:17:13 2014
@@ -80,6 +80,8 @@ public class ScanQueryMatcher {
private final long effectiveTS;
+ private boolean isDeleteColumnUsageEnabled = false;
+
/**
* Constructs a ScanQueryMatcher for a Scan.
* @param scan
@@ -91,10 +93,10 @@ public class ScanQueryMatcher {
NavigableSet<byte[]> columnSet, KeyValue.KeyComparator rowComparator,
int maxVersions, long readPointToUse,
long retainDeletesInOutputUntil,
- long oldestUnexpiredTS) {
+ long oldestUnexpiredTS, boolean isDeleteColumnUsageEnabled) {
this(scan, family, columnSet, rowComparator, maxVersions, readPointToUse,
retainDeletesInOutputUntil, oldestUnexpiredTS,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP, isDeleteColumnUsageEnabled);
}
/**
@@ -108,7 +110,7 @@ public class ScanQueryMatcher {
public ScanQueryMatcher(Scan scan, byte[] family,
NavigableSet<byte[]> columnSet, KeyValue.KeyComparator rowComparator,
int maxVersions, long readPointToUse, long retainDeletesInOutputUntil,
- long oldestUnexpiredTS, long oldestFlashBackTS) {
+ long oldestUnexpiredTS, long oldestFlashBackTS, boolean isDeleteColumnUsageEnabled) {
this.tr = scan.getTimeRange();
this.oldestStamp = oldestUnexpiredTS;
this.rowComparator = rowComparator;
@@ -121,9 +123,10 @@ public class ScanQueryMatcher {
this.retainDeletesInOutputUntil = retainDeletesInOutputUntil;
this.maxReadPointToTrackVersions = readPointToUse;
this.oldestFlashBackTS = oldestFlashBackTS;
+ this.isDeleteColumnUsageEnabled = isDeleteColumnUsageEnabled;
// Single branch to deal with two types of reads (columns vs all in family)
- if (columnSet == null || columnSet.size() == 0) {
+ if (columnSet == null || columnSet.isEmpty()) {
// there is always a null column in the wildcard column query.
hasNullColumn = true;
@@ -133,7 +136,6 @@ public class ScanQueryMatcher {
} else {
// whether there is null column in the explicit column query
hasNullColumn = (columnSet.first().length == 0);
-
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
// Note that we do not use oldestFlashBackTS here since
@@ -152,13 +154,13 @@ public class ScanQueryMatcher {
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, KeyValue.KeyComparator rowComparator,
- int maxVersions, long oldestUnexpiredTS) {
+ int maxVersions, long oldestUnexpiredTS, boolean isDeleteColumnUsageEnabled) {
// By default we will not include deletes.
// Deletes are included explicitly (for minor compaction).
this(scan, family, columns, rowComparator, maxVersions,
Long.MAX_VALUE, // max Readpoint to Track versions
Long.MAX_VALUE, // do not include deletes
- oldestUnexpiredTS);
+ oldestUnexpiredTS, isDeleteColumnUsageEnabled);
}
/**
@@ -170,11 +172,12 @@ public class ScanQueryMatcher {
* - got to the next row (MatchCode.DONE)
*
* @param kv KeyValue to check
+ * @param allScanners scanners from the current heap
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem
* caused by a data corruption.
*/
- public MatchCode match(KeyValue kv) throws IOException {
+ public MatchCode match(KeyValue kv, List<KeyValueScanner> allScanners) throws IOException {
if (kv.getTimestamp() > effectiveTS) {
return MatchCode.SEEK_TO_EFFECTIVE_TS;
}
@@ -264,7 +267,22 @@ public class ScanQueryMatcher {
int timestampComparison = tr.compare(timestamp);
if (timestampComparison >= 1) {
- return MatchCode.SKIP;
+ // we have to have the delete column bloom enabled to be able to seek to
+ // the exact kv - (if we don't know if there are any deletes we are unable
+ // to do that
+ if (isDeleteColumnUsageEnabled) {
+ KeyValue queriedKV = kv.createFirstOnRowColTS(Math.max(tr.getMax() - 1,
+ tr.getMin()));
+ for (KeyValueScanner kvScanner : allScanners) {
+ if (kvScanner.passesDeleteColumnCheck(queriedKV)) {
+ return MatchCode.SKIP;
+ }
+ }
+ HRegionServer.numOptimizedSeeks.incrementAndGet();
+ return MatchCode.SEEK_TO_EXACT_KV;
+ } else {
+ return MatchCode.SKIP;
+ }
} else if (timestampComparison <= -1) {
return getNextRowOrNextColumn(bytes, offset, qualLength);
}
@@ -497,5 +515,10 @@ public class ScanQueryMatcher {
* Include KeyValue and done with row, seek to next.
*/
INCLUDE_AND_SEEK_NEXT_ROW,
+
+ /**
+ * Go to the the exact KV
+ */
+ SEEK_TO_EXACT_KV
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Mar 12 21:17:13 2014
@@ -193,7 +193,7 @@ public class SplitLogWorker implements R
try {
LOG.info("SplitLogWorker starting");
this.watcher.createZNodeIfNotExists(this.watcher.splitLogZNode, new byte[0],
- CreateMode.PERSISTENT, false /* set watch? */);
+ CreateMode.PERSISTENT, false /* set watch? */);
this.watcher.registerListener(this);
int res;
// the above creation might have failed. Don't proceed until someone
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java Wed Mar 12 21:17:13 2014
@@ -46,7 +46,7 @@ class SplitRequest implements Runnable {
// Didn't need to be split
return;
}
-
+
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
@@ -64,14 +64,14 @@ class SplitRequest implements Runnable {
}
t = meta;
}
-
+
// Mark old region as offline and split in META.
// NOTE: there is no need for retry logic here. HTable does it for us.
oldRegionInfo.setOffline(true);
oldRegionInfo.setSplit(true);
// Inform the HRegionServer that the parent HRegion is no-longer online.
server.removeFromOnlineRegions(oldRegionInfo);
-
+
Put put = new Put(oldRegionInfo.getRegionName());
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(oldRegionInfo));
@@ -84,11 +84,11 @@ class SplitRequest implements Runnable {
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER, Writables
.getBytes(newRegions[1].getRegionInfo()));
t.put(put);
-
+
// If we crash here, then the daughters will not be added and we'll have
// and offlined parent but no daughters to take up the slack. hbase-2244
// adds fixup to the metascanners.
-
+
// Add new regions to META
for (int i = 0; i < newRegions.length; i++) {
put = new Put(newRegions[i].getRegionName());
@@ -96,15 +96,15 @@ class SplitRequest implements Runnable {
Writables.getBytes(newRegions[i].getRegionInfo()));
t.put(put);
}
-
+
// If we crash here, the master will not know of the new daughters and they
// will not be assigned. The metascanner when it runs will notice and take
// care of assigning the new daughters.
-
+
// Now tell the master about the new regions
server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
-
+
LOG.info("region split, META updated, and report to master all"
+ " successful. Old region=" + oldRegionInfo.toString()
+ ", new regions: " + newRegions[0].toString() + ", "
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Mar 12 21:17:13 2014
@@ -462,10 +462,8 @@ public class Store extends SchemaConfigu
*/
protected List<StoreFile> loadStoreFiles(FileStatus[] files)
throws IOException {
- ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-
if (files == null || files.length == 0) {
- return results;
+ return new ArrayList<StoreFile>(0);
}
// initialize the thread pool for opening store files in parallel..
ThreadPoolExecutor storeFileOpenerThreadPool =
@@ -490,6 +488,7 @@ public class Store extends SchemaConfigu
// open each store file in parallel
completionService.submit(new Callable<StoreFile>() {
+ @Override
public StoreFile call() throws IOException {
StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
family.getBloomFilterType(), dataBlockEncoder,
@@ -502,6 +501,7 @@ public class Store extends SchemaConfigu
totalValidStoreFile++;
}
+ ArrayList<StoreFile> results = new ArrayList<StoreFile>(totalValidStoreFile);
try {
for (int i = 0; i < totalValidStoreFile; i++) {
Future<StoreFile> future = completionService.take();
@@ -624,7 +624,9 @@ public class Store extends SchemaConfigu
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
- ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
+ ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(
+ storefiles.size() + 1);
+ newFiles.addAll(storefiles);
newFiles.add(sf);
this.storefiles = sortAndClone(newFiles);
} finally {
@@ -687,6 +689,7 @@ public class Store extends SchemaConfigu
new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
for (final StoreFile f : result) {
completionService.submit(new Callable<Void>() {
+ @Override
public Void call() throws IOException {
if (deleteStoreFile) {
f.deleteReader();
@@ -768,7 +771,7 @@ public class Store extends SchemaConfigu
long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
// Don't flush if there are no entries.
- if (snapshot.size() == 0) {
+ if (snapshot.isEmpty()) {
return null;
}
@@ -979,7 +982,9 @@ public class Store extends SchemaConfigu
throws IOException {
this.lock.writeLock().lock();
try {
- ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
+ ArrayList<StoreFile> newList = new ArrayList<StoreFile>(
+ storefiles.size() + 1);
+ newList.addAll(storefiles);
newList.add(sf);
storefiles = sortAndClone(newList);
@@ -1242,8 +1247,7 @@ public class Store extends SchemaConfigu
}
}
- List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
- return compactionManager.isMajorCompaction(candidates);
+ return compactionManager.isMajorCompaction(this.storefiles);
}
boolean isPeakTime(int currentHour) {
@@ -1523,15 +1527,22 @@ public class Store extends SchemaConfigu
public HFileHistogram getHistogram() throws IOException {
this.lock.readLock().lock();
try {
- if (hist != null) return hist;
- List<HFileHistogram> histograms = new ArrayList<HFileHistogram>();
- if (storefiles.size() == 0) return hist;
+ if (hist != null) {
+ return hist;
+ }
+ if (storefiles.isEmpty()) {
+ return null;
+ }
+ HFileHistogram h = null;
for (StoreFile file : this.storefiles) {
HFileHistogram hist = file.getHistogram();
- if (hist != null) histograms.add(hist);
+ if (hist != null) {
+ if (h == null) {
+ h = hist.create(hist.getBinCount());
+ }
+ h.merge(hist);
+ }
}
- if (histograms.size() == 0) return hist;
- HFileHistogram h = histograms.get(0).compose(histograms);
this.hist = h;
return hist;
} catch (Exception e) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Mar 12 21:17:13 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.Inje
import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
/**
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
@@ -138,7 +139,7 @@ public class StoreScanner extends NonLaz
store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()), Long.MAX_VALUE,
Long.MAX_VALUE, // do not include the deletes
- oldestUnexpiredTS);
+ oldestUnexpiredTS, isUsingDeleteColBloom);
}
public synchronized void initialize() throws IOException {
@@ -218,7 +219,7 @@ public class StoreScanner extends NonLaz
new ScanQueryMatcher(scan, store.getFamily().getName(), null,
store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()), smallestReadPoint,
- retainDeletesInOutputUntil, oldestUnexpiredTS, oldestFlashBackTS);
+ retainDeletesInOutputUntil, oldestUnexpiredTS, oldestFlashBackTS, isUsingDeleteColBloom);
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners);
@@ -256,7 +257,7 @@ public class StoreScanner extends NonLaz
comparator.getRawComparator(), scan.getMaxVersions(),
Long.MAX_VALUE,
retainDeletesInOutputUntil,
- oldestUnexpiredTS);
+ oldestUnexpiredTS, isUsingDeleteColBloom);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -420,7 +421,7 @@ public class StoreScanner extends NonLaz
"smaller key " + kv + " in cf " + store);
}
prevKV = kv;
- ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
+ ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv, this.heap.getActiveScanners());
if ((qcode == MatchCode.INCLUDE) ||
(qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) ||
(qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
@@ -432,7 +433,12 @@ public class StoreScanner extends NonLaz
qcode = keyValueAggregator.nextAction(qcode);
}
}
- switch(qcode) {
+ switch (qcode) {
+ case SEEK_TO_EXACT_KV:
+ KeyValue queriedKV = kv.createFirstOnRowColTS(Math.max(
+ matcher.tr.getMax() - 1, matcher.tr.getMin()));
+ reseek(queriedKV);
+ break;
case SEEK_TO_EFFECTIVE_TS:
reseek(matcher.getKeyForEffectiveTSOnRow(kv));
break;
@@ -748,3 +754,4 @@ public class StoreScanner extends NonLaz
return true;
}
}
+