You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 21:58:32 UTC
svn commit: r1445918 [10/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/
dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/ma...
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java Wed Feb 13 20:58:23 2013
@@ -61,7 +61,6 @@ public abstract class TableEventHandler
protected final MasterServices masterServices;
protected final byte [] tableName;
protected final String tableNameStr;
- protected boolean persistedToZk = false;
public TableEventHandler(EventType eventType, byte [] tableName, Server server,
MasterServices masterServices)
@@ -111,10 +110,7 @@ public abstract class TableEventHandler
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
- } finally {
- // notify the waiting thread that we're done persisting the request
- setPersist();
- }
+ }
}
public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
@@ -165,29 +161,6 @@ public abstract class TableEventHandler
return done;
}
- /**
- * Table modifications are processed asynchronously, but provide an API for
- * you to query their status.
- *
- * @throws IOException
- */
- public synchronized void waitForPersist() throws IOException {
- if (!persistedToZk) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw (IOException) new InterruptedIOException().initCause(ie);
- }
- assert persistedToZk;
- }
- }
-
- private synchronized void setPersist() {
- if (!persistedToZk) {
- persistedToZk = true;
- notify();
- }
- }
/**
* Gets a TableDescriptor from the masterServices. Can Throw exceptions.
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -55,11 +55,8 @@ public class TableModifyFamilyHandler ex
if (cpHost != null) {
cpHost.preModifyColumnHandler(this.tableName, this.familyDesc);
}
- // Update table descriptor in HDFS
- HTableDescriptor htd =
- this.masterServices.getMasterFileSystem().modifyColumn(tableName, familyDesc);
- // Update in-memory descriptor cache
- this.masterServices.getTableDescriptors().add(htd);
+ // Update table descriptor
+ this.masterServices.getMasterFileSystem().modifyColumn(tableName, familyDesc);
if (cpHost != null) {
cpHost.postModifyColumnHandler(this.tableName, this.familyDesc);
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java Wed Feb 13 20:58:23 2013
@@ -79,17 +79,18 @@ public abstract class LogMonitoring {
private static void dumpTailOfLog(File f, PrintWriter out, long tailKb)
throws IOException {
FileInputStream fis = new FileInputStream(f);
+ BufferedReader r = null;
try {
FileChannel channel = fis.getChannel();
channel.position(Math.max(0, channel.size() - tailKb*1024));
- BufferedReader r = new BufferedReader(
- new InputStreamReader(fis));
+ r = new BufferedReader(new InputStreamReader(fis));
r.readLine(); // skip the first partial line
String line;
while ((line = r.readLine()) != null) {
out.println(line);
}
} finally {
+ if (r != null) IOUtils.closeStream(r);
IOUtils.closeStream(fis);
}
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Feb 13 20:58:23 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -66,10 +66,9 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
@@ -77,7 +76,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -116,7 +114,6 @@ import com.google.protobuf.ByteString;
* or build components for protocol buffer requests.
*/
@InterfaceAudience.Private
-@SuppressWarnings("deprecation")
public final class RequestConverter {
private RequestConverter() {
@@ -170,7 +167,7 @@ public final class RequestConverter {
* @param regionName the name of the region to get
* @param get the client Get
* @param existenceOnly indicate if check row existence only
- * @return a protocol buffer GetReuqest
+ * @return a protocol buffer GetRequest
*/
public static GetRequest buildGetRequest(final byte[] regionName,
final Get get, final boolean existenceOnly) throws IOException {
@@ -184,6 +181,27 @@ public final class RequestConverter {
}
/**
+ * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
+ * the same region.
+ * @param regionName the name of the region to get from
+ * @param gets the client Gets
+ * @param existenceOnly indicate if check rows existence only
+ * @return a protocol buffer MultiGetRequest
+ */
+ public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
+ final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
+ MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+ builder.setExistenceOnly(existenceOnly);
+ builder.setClosestRowBefore(closestRowBefore);
+ builder.setRegion(region);
+ for (Get get : gets) {
+ builder.addGet(ProtobufUtil.toGet(get));
+ }
+ return builder.build();
+ }
+
+ /**
* Create a protocol buffer MutateRequest for a client increment
*
* @param regionName
@@ -437,40 +455,6 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer LockRowRequest
- *
- * @param regionName
- * @param row
- * @return a lock row request
- */
- public static LockRowRequest buildLockRowRequest(
- final byte[] regionName, final byte[] row) {
- LockRowRequest.Builder builder = LockRowRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- builder.addRow(ByteString.copyFrom(row));
- return builder.build();
- }
-
- /**
- * Create a protocol buffer UnlockRowRequest
- *
- * @param regionName
- * @param lockId
- * @return a unlock row request
- */
- public static UnlockRowRequest buildUnlockRowRequest(
- final byte[] regionName, final long lockId) {
- UnlockRowRequest.Builder builder = UnlockRowRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- builder.setLockId(lockId);
- return builder.build();
- }
-
- /**
* Create a protocol buffer bulk load request
*
* @param familyPaths
@@ -496,24 +480,6 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer coprocessor exec request
- *
- * @param regionName
- * @param exec
- * @return a coprocessor exec request
- * @throws IOException
- */
- public static ExecCoprocessorRequest buildExecCoprocessorRequest(
- final byte[] regionName, final Exec exec) throws IOException {
- ExecCoprocessorRequest.Builder builder = ExecCoprocessorRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- builder.setCall(ProtobufUtil.toExec(exec));
- return builder.build();
- }
-
- /**
* Create a protocol buffer multi request for a list of actions.
* RowMutations in the list (if any) will be ignored.
*
@@ -538,8 +504,6 @@ public final class RequestConverter {
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
} else if (row instanceof Delete) {
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
- } else if (row instanceof Exec) {
- protoAction.setExec(ProtobufUtil.toExec((Exec)row));
} else if (row instanceof Append) {
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
} else if (row instanceof Increment) {
@@ -1054,9 +1018,10 @@ public final class RequestConverter {
* @param table
* @return a GetSchemaAlterStatusRequest
*/
- public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(final byte [] table) {
+ public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(
+ final byte [] tableName) {
GetSchemaAlterStatusRequest.Builder builder = GetSchemaAlterStatusRequest.newBuilder();
- builder.setTableName(ByteString.copyFrom(table));
+ builder.setTableName(ByteString.copyFrom(tableName));
return builder.build();
}
@@ -1151,6 +1116,78 @@ public final class RequestConverter {
}
/**
+ * Create a request to grant user permissions.
+ *
+ * @param username the short user name who to grant permissions
+ * @param table optional table name the permissions apply
+ * @param family optional column family
+ * @param qualifier optional qualifier
+ * @param actions the permissions to be granted
+ * @return A {@link AccessControlProtos} GrantRequest
+ */
+ public static AccessControlProtos.GrantRequest buildGrantRequest(
+ String username, byte[] table, byte[] family, byte[] qualifier,
+ AccessControlProtos.Permission.Action... actions) {
+ AccessControlProtos.Permission.Builder permissionBuilder =
+ AccessControlProtos.Permission.newBuilder();
+ for (AccessControlProtos.Permission.Action a : actions) {
+ permissionBuilder.addAction(a);
+ }
+ if (table != null) {
+ permissionBuilder.setTable(ByteString.copyFrom(table));
+ }
+ if (family != null) {
+ permissionBuilder.setFamily(ByteString.copyFrom(family));
+ }
+ if (qualifier != null) {
+ permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+
+ return AccessControlProtos.GrantRequest.newBuilder()
+ .setPermission(
+ AccessControlProtos.UserPermission.newBuilder()
+ .setUser(ByteString.copyFromUtf8(username))
+ .setPermission(permissionBuilder.build())
+ ).build();
+ }
+
+ /**
+ * Create a request to revoke user permissions.
+ *
+ * @param username the short user name whose permissions to be revoked
+ * @param table optional table name the permissions apply
+ * @param family optional column family
+ * @param qualifier optional qualifier
+ * @param actions the permissions to be revoked
+ * @return A {@link AccessControlProtos} RevokeRequest
+ */
+ public static AccessControlProtos.RevokeRequest buildRevokeRequest(
+ String username, byte[] table, byte[] family, byte[] qualifier,
+ AccessControlProtos.Permission.Action... actions) {
+ AccessControlProtos.Permission.Builder permissionBuilder =
+ AccessControlProtos.Permission.newBuilder();
+ for (AccessControlProtos.Permission.Action a : actions) {
+ permissionBuilder.addAction(a);
+ }
+ if (table != null) {
+ permissionBuilder.setTable(ByteString.copyFrom(table));
+ }
+ if (family != null) {
+ permissionBuilder.setFamily(ByteString.copyFrom(family));
+ }
+ if (qualifier != null) {
+ permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+
+ return AccessControlProtos.RevokeRequest.newBuilder()
+ .setPermission(
+ AccessControlProtos.UserPermission.newBuilder()
+ .setUser(ByteString.copyFromUtf8(username))
+ .setPermission(permissionBuilder.build())
+ ).build();
+ }
+
+ /**
* Create a RegionOpenInfo based on given region info and version of offline node
*/
private static RegionOpenInfo buildRegionOpenInfo(
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Wed Feb 13 20:58:23 2013
@@ -91,7 +91,8 @@ public final class ResponseConverter {
if (result.hasException()) {
results.add(ProtobufUtil.toException(result.getException()));
} else if (result.hasValue()) {
- Object value = ProtobufUtil.toObject(result.getValue());
+ ClientProtos.Result r = result.getValue();
+ Object value = ProtobufUtil.toResult(r);
if (value instanceof ClientProtos.Result) {
results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
} else {
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Wed Feb 13 20:58:23 2013
@@ -250,9 +250,12 @@ public class CompactSplitThread implemen
while (!done) {
try {
done = t.awaitTermination(60, TimeUnit.SECONDS);
- LOG.debug("Waiting for " + name + " to finish...");
+ LOG.info("Waiting for " + name + " to finish...");
+ if (!done) {
+ t.shutdownNow();
+ }
} catch (InterruptedException ie) {
- LOG.debug("Interrupted waiting for " + name + " to finish...");
+ LOG.warn("Interrupted waiting for " + name + " to finish...");
}
}
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Wed Feb 13 20:58:23 2013
@@ -157,10 +157,12 @@ public class CompactionTool extends Conf
HStore store = getStore(region, familyDir);
do {
CompactionRequest cr = store.requestCompaction();
- StoreFile storeFile = store.compact(cr);
- if (storeFile != null) {
+ List<StoreFile> storeFiles = store.compact(cr);
+ if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) {
- fs.delete(storeFile.getPath(), false);
+ for (StoreFile storeFile: storeFiles) {
+ fs.delete(storeFile.getPath(), false);
+ }
}
}
} while (store.needsCompaction() && !compactOnce);
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
/**
* A {@link RegionSplitPolicy} implementation which splits a region
@@ -36,14 +38,15 @@ public class ConstantSizeRegionSplitPoli
@Override
protected void configureForRegion(HRegion region) {
super.configureForRegion(region);
- long maxFileSize = region.getTableDesc().getMaxFileSize();
-
- // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
- if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
- maxFileSize = getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+ Configuration conf = getConf();
+ HTableDescriptor desc = region.getTableDesc();
+ if (desc != null) {
+ this.desiredMaxFileSize = desc.getMaxFileSize();
+ }
+ if (this.desiredMaxFileSize <= 0) {
+ this.desiredMaxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
}
- this.desiredMaxFileSize = maxFileSize;
}
@Override
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
@@ -45,10 +46,15 @@ extends ConstantSizeRegionSplitPolicy {
@Override
protected void configureForRegion(HRegion region) {
super.configureForRegion(region);
- this.flushSize = region.getTableDesc() != null?
- region.getTableDesc().getMemStoreFlushSize():
- getConf().getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ Configuration conf = getConf();
+ HTableDescriptor desc = region.getTableDesc();
+ if (desc != null) {
+ this.flushSize = desc.getMemStoreFlushSize();
+ }
+ if (this.flushSize <= 0) {
+ this.flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+ }
}
@Override
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.classification.
public class KeyPrefixRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
private static final Log LOG = LogFactory
.getLog(KeyPrefixRegionSplitPolicy.class);
- public static String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
+ public static final String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
private int prefixLength = 0;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Wed Feb 13 20:58:23 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -47,7 +48,7 @@ class LogRoller extends HasThread implem
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final Server server;
- private final RegionServerServices services;
+ protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
// Period to roll log.
private final long rollperiod;
@@ -92,7 +93,7 @@ class LogRoller extends HasThread implem
try {
this.lastrolltime = now;
// This is array of actual region names.
- byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
+ byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
@@ -159,6 +160,10 @@ class LogRoller extends HasThread implem
}
}
+ protected HLog getWAL() throws IOException {
+ return this.services.getWAL(null);
+ }
+
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
// Not interested
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Feb 13 20:58:23 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -29,10 +30,10 @@ import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExc
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
@@ -59,7 +61,7 @@ import com.google.common.base.Preconditi
* @see FlushRequester
*/
@InterfaceAudience.Private
-class MemStoreFlusher extends HasThread implements FlushRequester {
+class MemStoreFlusher implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
@@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread
private final long threadWakeFrequency;
private final HRegionServer server;
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition flushOccurred = lock.newCondition();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Object blockSignal = new Object();
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
@@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread
private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter();
+ private FlushHandler[] flushHandlers = null;
+ private int handlerCount;
+
/**
* @param conf
* @param server
@@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
+ this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
LOG.info("globalMemStoreLimit=" +
StringUtils.humanReadableInt(this.globalMemStoreLimit) +
", globalMemStoreLimitLowMark=" +
@@ -213,64 +219,59 @@ class MemStoreFlusher extends HasThread
return true;
}
- @Override
- public void run() {
- while (!this.server.isStopped()) {
- FlushQueueEntry fqe = null;
- try {
- wakeupPending.set(false); // allow someone to wake us up again
- fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- if (fqe == null || fqe instanceof WakeupFlushThread) {
- if (isAboveLowWaterMark()) {
- LOG.debug("Flush thread woke up because memory above low water=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
- if (!flushOneForGlobalPressure()) {
- // Wasn't able to flush any region, but we're above low water mark
- // This is unlikely to happen, but might happen when closing the
- // entire server - another thread is flushing regions. We'll just
- // sleep a little bit to avoid spinning, and then pretend that
- // we flushed one, so anyone blocked will check again
- lock.lock();
- try {
+ private class FlushHandler extends HasThread {
+ @Override
+ public void run() {
+ while (!server.isStopped()) {
+ FlushQueueEntry fqe = null;
+ try {
+ wakeupPending.set(false); // allow someone to wake us up again
+ fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ if (fqe == null || fqe instanceof WakeupFlushThread) {
+ if (isAboveLowWaterMark()) {
+ LOG.debug("Flush thread woke up because memory above low water="
+ + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+ if (!flushOneForGlobalPressure()) {
+ // Wasn't able to flush any region, but we're above low water mark
+ // This is unlikely to happen, but might happen when closing the
+ // entire server - another thread is flushing regions. We'll just
+ // sleep a little bit to avoid spinning, and then pretend that
+ // we flushed one, so anyone blocked will check again
Thread.sleep(1000);
- flushOccurred.signalAll();
- } finally {
- lock.unlock();
+ wakeUpIfBlocking();
}
+ // Enqueue another one of these tokens so we'll wake up again
+ wakeupFlushThread();
}
- // Enqueue another one of these tokens so we'll wake up again
- wakeupFlushThread();
+ continue;
+ }
+ FlushRegionEntry fre = (FlushRegionEntry) fqe;
+ if (!flushRegion(fre)) {
+ break;
}
+ } catch (InterruptedException ex) {
continue;
- }
- FlushRegionEntry fre = (FlushRegionEntry)fqe;
- if (!flushRegion(fre)) {
- break;
- }
- } catch (InterruptedException ex) {
- continue;
- } catch (ConcurrentModificationException ex) {
- continue;
- } catch (Exception ex) {
- LOG.error("Cache flusher failed for entry " + fqe, ex);
- if (!server.checkFileSystem()) {
- break;
+ } catch (ConcurrentModificationException ex) {
+ continue;
+ } catch (Exception ex) {
+ LOG.error("Cache flusher failed for entry " + fqe, ex);
+ if (!server.checkFileSystem()) {
+ break;
+ }
}
}
- }
- this.regionsInQueue.clear();
- this.flushQueue.clear();
+ synchronized (regionsInQueue) {
+ regionsInQueue.clear();
+ flushQueue.clear();
+ }
- // Signal anyone waiting, so they see the close flag
- lock.lock();
- try {
- flushOccurred.signalAll();
- } finally {
- lock.unlock();
+ // Signal anyone waiting, so they see the close flag
+ wakeUpIfBlocking();
+ LOG.info(getName() + " exiting");
}
- LOG.info(getName() + " exiting");
}
+
private void wakeupFlushThread() {
if (wakeupPending.compareAndSet(false, true)) {
flushQueue.add(new WakeupFlushThread());
@@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread
continue;
}
+ if (region.writestate.flushing || !region.writestate.writesEnabled) {
+ continue;
+ }
+
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
continue;
}
@@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
- lock.lock();
+ lock.writeLock().lock();
try {
- this.interrupt();
+ for (FlushHandler flushHander : flushHandlers) {
+ if (flushHander != null) flushHander.interrupt();
+ }
} finally {
- lock.unlock();
+ lock.writeLock().unlock();
+ }
+ }
+
+ synchronized void start(UncaughtExceptionHandler eh) {
+ ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
+ server.getServerName().toString() + "-MemStoreFlusher", eh);
+ flushHandlers = new FlushHandler[handlerCount];
+ for (int i = 0; i < flushHandlers.length; i++) {
+ flushHandlers[i] = new FlushHandler();
+ flusherThreadFactory.newThread(flushHandlers[i]);
+ flushHandlers[i].start();
+ }
+ }
+
+ boolean isAlive() {
+ for (FlushHandler flushHander : flushHandlers) {
+ if (flushHander != null && flushHander.isAlive()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void join() {
+ for (FlushHandler flushHander : flushHandlers) {
+ if (flushHander != null) {
+ Threads.shutdown(flushHander.getThread());
+ }
}
}
@@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
- this.server.compactSplitThread.requestCompaction(region, getName());
+ this.server.compactSplitThread.requestCompaction(region, Thread
+ .currentThread().getName());
} catch (IOException e) {
LOG.error(
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
- lock.lock();
}
+ lock.readLock().lock();
try {
boolean shouldCompact = region.flushcache();
// We just want to check the size
@@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
- server.compactSplitThread.requestCompaction(region, getName());
+ server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
@@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread
return false;
}
} finally {
- try {
- flushOccurred.signalAll();
- } finally {
- lock.unlock();
- }
+ lock.readLock().unlock();
+ wakeUpIfBlocking();
}
return true;
}
+ private void wakeUpIfBlocking() {
+ synchronized (blockSignal) {
+ blockSignal.notifyAll();
+ }
+ }
+
private boolean isTooManyStoreFiles(HRegion region) {
for (Store hstore : region.stores.values()) {
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread
*/
public void reclaimMemStoreMemory() {
if (isAboveHighWaterMark()) {
- lock.lock();
- try {
+ long start = System.currentTimeMillis();
+ synchronized (this.blockSignal) {
boolean blocked = false;
long startTime = 0;
while (isAboveHighWaterMark() && !server.isStopped()) {
- if(!blocked){
+ if (!blocked) {
startTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Blocking updates on " + server.toString() +
": the global memstore size " +
@@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread
try {
// we should be able to wait forever, but we've seen a bug where
// we miss a notify, so put a 5 second bound on it at least.
- flushOccurred.await(5, TimeUnit.SECONDS);
+ blockSignal.wait(5 * 1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
+ long took = System.currentTimeMillis() - start;
+ LOG.warn("Memstore is above high water mark and block " + took + "ms");
}
if(blocked){
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread
}
LOG.info("Unblocking updates for server " + server.toString());
}
- } finally {
- lock.unlock();
}
} else if (isAboveLowWaterMark()) {
wakeupFlushThread();
@@ -500,21 +539,21 @@ class MemStoreFlusher extends HasThread
return "flush_queue="
+ flushQueue.size();
}
-
+
public String dumpQueue() {
StringBuilder queueList = new StringBuilder();
queueList.append("Flush Queue Queue dump:\n");
queueList.append(" Flush Queue:\n");
java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
-
+
while(it.hasNext()){
queueList.append(" "+it.next().toString());
queueList.append("\n");
}
-
+
return queueList.toString();
}
-
+
interface FlushQueueEntry extends Delayed {}
/**
@@ -530,6 +569,12 @@ class MemStoreFlusher extends HasThread
public int compareTo(Delayed o) {
return -1;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ return (this == obj);
+ }
+
}
/**
@@ -597,5 +642,17 @@ class MemStoreFlusher extends HasThread
public String toString() {
return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ Delayed other = (Delayed) obj;
+ return compareTo(other) == 0;
+ }
}
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java Wed Feb 13 20:58:23 2013
@@ -40,11 +40,11 @@ interface OnlineRegions extends Server {
/**
* This method removes HRegion corresponding to hri from the Map of onlineRegions.
*
- * @param encodedRegionName
- * @param destination - destination, if any. Null otherwise
+ * @param r Region to remove.
+ * @param destination Destination, if any, null otherwise.
* @return True if we removed a region from online list.
*/
- public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination);
+ public boolean removeFromOnlineRegions(final HRegion r, ServerName destination);
/**
* Return {@link HRegion} instance.
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Feb 13 20:58:23 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -135,6 +134,7 @@ public class RegionCoprocessorHost
*/
public RegionCoprocessorHost(final HRegion region,
final RegionServerServices rsServices, final Configuration conf) {
+ this.conf = conf;
this.rsServices = rsServices;
this.region = region;
this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
@@ -214,11 +214,6 @@ public class RegionCoprocessorHost
// It uses a visitor pattern to invoke registered Endpoint
// method.
for (Class c : implClass.getInterfaces()) {
- if (CoprocessorProtocol.class.isAssignableFrom(c)) {
- region.registerProtocol(c, (CoprocessorProtocol)instance);
- }
- // we allow endpoints to register as both CoproocessorProtocols and Services
- // for ease of transition
if (CoprocessorService.class.isAssignableFrom(c)) {
region.registerService( ((CoprocessorService)instance).getService() );
}
@@ -430,9 +425,11 @@ public class RegionCoprocessorHost
* Called prior to rewriting the store files selected for compaction
* @param store the store being compacted
* @param scanner the scanner used to read store data during compaction
- * @throws IOException
+ * @param scanType type of Scan
+ * @throws IOException
*/
- public InternalScanner preCompact(HStore store, InternalScanner scanner) throws IOException {
+ public InternalScanner preCompact(HStore store, InternalScanner scanner,
+ ScanType scanType) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
@@ -440,7 +437,7 @@ public class RegionCoprocessorHost
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
scanner = ((RegionObserver)env.getInstance()).preCompact(
- ctx, store, scanner);
+ ctx, store, scanner, scanType);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
@@ -503,7 +500,7 @@ public class RegionCoprocessorHost
/**
* Invoked before a memstore flush
- * @throws IOException
+ * @throws IOException
*/
public void preFlush() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -524,7 +521,7 @@ public class RegionCoprocessorHost
/**
* See
- * {@link RegionObserver#preFlush(ObserverContext<RegionCoprocessorEnvironment>, HStore, KeyValueScanner)}
+ * {@link RegionObserver#preFlushScannerOpen(ObserverContext, HStore, KeyValueScanner, InternalScanner)}
*/
public InternalScanner preFlushScannerOpen(HStore store, KeyValueScanner memstoreScanner) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -607,7 +604,7 @@ public class RegionCoprocessorHost
}
}
}
-
+
/**
* Invoked just before a split
* @throws IOException
@@ -633,7 +630,7 @@ public class RegionCoprocessorHost
* Invoked just after a split
* @param l the new left-hand daughter region
* @param r the new right-hand daughter region
- * @throws IOException
+ * @throws IOException
*/
public void postSplit(HRegion l, HRegion r) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -651,7 +648,7 @@ public class RegionCoprocessorHost
}
}
}
-
+
/**
* Invoked just before the rollback of a failed split is started
* @throws IOException
@@ -672,7 +669,7 @@ public class RegionCoprocessorHost
}
}
}
-
+
/**
* Invoked just after the rollback of a failed split is done
* @throws IOException
@@ -693,7 +690,7 @@ public class RegionCoprocessorHost
}
}
}
-
+
/**
* Invoked after a split is completed irrespective of a failure or success.
* @throws IOException
@@ -1355,6 +1352,35 @@ public class RegionCoprocessorHost
}
/**
+ * This will be called by the scan flow when the current scanned row is being filtered out by the
+ * filter.
+ * @param s the scanner
+ * @param currentRow The current rowkey which got filtered out
+ * @return whether more rows are available for the scanner or not
+ * @throws IOException
+ */
+ public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
+ throws IOException {
+ boolean hasMore = true; // By default assume more rows there.
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
+ hasMore);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return hasMore;
+ }
+
+ /**
* @param s the scanner
* @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Wed Feb 13 20:58:23 2013
@@ -19,10 +19,11 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -38,8 +39,9 @@ public interface RegionServerServices ex
*/
public boolean isStopping();
- /** @return the HLog */
- public HLog getWAL();
+ /** @return the HLog for a particular region. Pass null for getting the
+ * default (common) WAL */
+ public HLog getWAL(HRegionInfo regionInfo) throws IOException;
/**
* @return Implementation of {@link CompactionRequestor} or null.
@@ -79,7 +81,7 @@ public interface RegionServerServices ex
* Get the regions that are currently being opened or closed in the RS
* @return map of regions in transition in this RS
*/
- public Map<byte[], Boolean> getRegionsInTransitionInRS();
+ public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS();
/**
* @return Return the FileSystem object used by the regionserver
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -91,10 +91,9 @@ public abstract class RegionSplitPolicy
/**
* Create the RegionSplitPolicy configured for the given table.
- * Each
* @param region
* @param conf
- * @return
+ * @return a RegionSplitPolicy
* @throws IOException
*/
public static RegionSplitPolicy create(HRegion region,
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Feb 13 20:58:23 2013
@@ -119,7 +119,7 @@ public interface RowProcessor<S extends
/**
* This method should return any additional data that is needed on the
* server side to construct the RowProcessor. The server will pass this to
- * the {@link #initialize(ByteString)} method. If there is no RowProcessor
+ * the {@link #initialize(Message msg)} method. If there is no RowProcessor
* specific data then null should be returned.
* @return the PB message
* @throws IOException
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Feb 13 20:58:23 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DeserializationException;
-import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
@@ -80,7 +79,7 @@ public class SplitLogWorker extends ZooK
private volatile String currentTask = null;
private int currentVersion;
private volatile boolean exitWorker;
- private Object grabTaskLock = new Object();
+ private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
@@ -109,8 +108,8 @@ public class SplitLogWorker extends ZooK
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- if (HLogSplitter.splitLogFile(rootdir,
- fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker) == false) {
+ if (!HLogSplitter.splitLogFile(rootdir,
+ fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -249,13 +248,13 @@ public class SplitLogWorker extends ZooK
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
- if (slt.isUnassigned() == false) {
+ if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
return;
}
currentVersion = stat.getVersion();
- if (attemptToOwnTask(true) == false) {
+ if (!attemptToOwnTask(true)) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
@@ -277,7 +276,7 @@ public class SplitLogWorker extends ZooK
@Override
public boolean progress() {
- if (attemptToOwnTask(false) == false) {
+ if (!attemptToOwnTask(false)) {
LOG.warn("Failed to heartbeat the task" + currentTask);
return false;
}
@@ -323,7 +322,6 @@ public class SplitLogWorker extends ZooK
Thread.interrupted();
}
}
- return;
}
/**
@@ -395,8 +393,7 @@ public class SplitLogWorker extends ZooK
} catch (KeeperException e) {
LOG.warn("failed to end task, " + path + " " + slt, e);
}
- SplitLogCounters.tot_wkr_final_transistion_failed.incrementAndGet();
- return;
+ SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
}
void getDataSetWatchAsync() {
@@ -531,7 +528,6 @@ public class SplitLogWorker extends ZooK
worker = new Thread(null, this, "SplitLogWorker-" + serverName);
exitWorker = false;
worker.start();
- return;
}
/**
@@ -558,7 +554,6 @@ public class SplitLogWorker extends ZooK
}
data = watcher.getRecoverableZooKeeper().removeMetaData(data);
getDataSetWatchSuccess(path, data);
- return;
}
}
@@ -574,7 +569,7 @@ public class SplitLogWorker extends ZooK
DONE(),
ERR(),
RESIGNED(),
- PREEMPTED();
+ PREEMPTED()
}
public Status exec(String name, CancelableProgressable p);
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Wed Feb 13 20:58:23 2013
@@ -296,7 +296,7 @@ public class SplitTransaction {
throw new IOException(errorMsg);
}
if (!testing) {
- services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null);
+ services.removeFromOnlineRegions(this.parent, null);
}
this.journal.add(JournalEntry.OFFLINED_PARENT);
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Wed Feb 13 20:58:23 2013
@@ -219,7 +219,7 @@ public class StoreFileScanner implements
*
* @param s
* @param k
- * @return
+ * @return false if not found or if k is after the end.
* @throws IOException
*/
public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
@@ -279,7 +279,7 @@ public class StoreFileScanner implements
boolean haveToSeek = true;
if (useBloom) {
// check ROWCOL Bloom filter first.
- if (reader.getBloomFilterType() == StoreFile.BloomType.ROWCOL) {
+ if (reader.getBloomFilterType() == BloomType.ROWCOL) {
haveToSeek = reader.passesGeneralBloomFilter(kv.getBuffer(),
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
kv.getQualifierOffset(), kv.getQualifierLength());
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Feb 13 20:58:23 2013
@@ -44,38 +44,38 @@ import org.apache.hadoop.hbase.util.Envi
public class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
- private HStore store;
- private ScanQueryMatcher matcher;
- private KeyValueHeap heap;
- private boolean cacheBlocks;
-
- private int countPerRow = 0;
- private int storeLimit = -1;
- private int storeOffset = 0;
+ protected HStore store;
+ protected ScanQueryMatcher matcher;
+ protected KeyValueHeap heap;
+ protected boolean cacheBlocks;
+
+ protected int countPerRow = 0;
+ protected int storeLimit = -1;
+ protected int storeOffset = 0;
// Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
- private boolean closing = false;
- private final boolean isGet;
- private final boolean explicitColumnQuery;
- private final boolean useRowColBloom;
- private final Scan scan;
- private final NavigableSet<byte[]> columns;
- private final long oldestUnexpiredTS;
- private final int minVersions;
+ protected boolean closing = false;
+ protected final boolean isGet;
+ protected final boolean explicitColumnQuery;
+ protected final boolean useRowColBloom;
+ protected final Scan scan;
+ protected final NavigableSet<byte[]> columns;
+ protected final long oldestUnexpiredTS;
+ protected final int minVersions;
/** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
/** Used during unit testing to ensure that lazy seek does save seek ops */
- private static boolean lazySeekEnabledGlobally =
+ protected static boolean lazySeekEnabledGlobally =
LAZY_SEEK_ENABLED_BY_DEFAULT;
// if heap == null and lastTop != null, you need to reseek given the key below
- private KeyValue lastTop = null;
+ protected KeyValue lastTop = null;
/** An internal constructor. */
- private StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
+ protected StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions) {
this.store = store;
this.cacheBlocks = cacheBlocks;
@@ -203,7 +203,7 @@ public class StoreScanner extends NonLaz
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
- private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
+ protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
isCompaction, matcher));
@@ -213,7 +213,7 @@ public class StoreScanner extends NonLaz
* Filters the given list of scanners using Bloom filter, time range, and
* TTL.
*/
- private List<KeyValueScanner> selectScannersFrom(
+ protected List<KeyValueScanner> selectScannersFrom(
final List<? extends KeyValueScanner> allScanners) {
boolean memOnly;
boolean filesOnly;
@@ -277,13 +277,8 @@ public class StoreScanner extends NonLaz
@Override
public synchronized boolean seek(KeyValue key) throws IOException {
- if (this.heap == null) {
-
- List<KeyValueScanner> scanners = getScannersNoCompaction();
-
- heap = new KeyValueHeap(scanners, store.comparator);
- }
-
+ // reset matcher state, in case that underlying store changed
+ checkReseek();
return this.heap.seek(key);
}
@@ -491,7 +486,7 @@ public class StoreScanner extends NonLaz
* next KV)
* @throws IOException
*/
- private boolean checkReseek() throws IOException {
+ protected boolean checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
@@ -507,7 +502,7 @@ public class StoreScanner extends NonLaz
return false;
}
- private void resetScannerStack(KeyValue lastTopKey) throws IOException {
+ protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Wed Feb 13 20:58:23 2013
@@ -148,6 +148,4 @@ public class TimeRangeTracker implements
public String toString() {
return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
}
-
}
-
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Wed Feb 13 20:58:23 2013
@@ -19,391 +19,133 @@
package org.apache.hadoop.hbase.regionserver.compactions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.Random;
+import org.apache.hadoop.util.ReflectionUtils;
/**
- * The default (and only, as of now) algorithm for selecting files for compaction.
- * Combines the compaction configuration and the provisional file selection that
- * it's given to produce the list of suitable candidates for compaction.
+ * A compaction policy determines how to select files for compaction,
+ * how to compact them, and how to generate the compacted files.
*/
@InterfaceAudience.Private
-public class CompactionPolicy {
+public abstract class CompactionPolicy extends Configured {
+
+ /**
+ * The name of the configuration parameter that specifies
+ * the class of a compaction policy that is used to compact
+ * HBase store files.
+ */
+ public static final String COMPACTION_POLICY_KEY =
+ "hbase.hstore.compaction.policy";
- private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
- private final static Calendar calendar = new GregorianCalendar();
+ private static final Class<? extends CompactionPolicy>
+ DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
CompactionConfiguration comConf;
- StoreConfiguration storeConfig;
-
- public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
- updateConfiguration(configuration, storeConfig);
- }
+ Compactor compactor;
+ HStore store;
/**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
- public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
- boolean isUserCompaction, boolean forceMajor)
- throws IOException {
- // Prelimanry compaction subject to filters
- CompactSelection candidateSelection = new CompactSelection(candidateFiles);
- long cfTtl = this.storeConfig.getStoreFileTtl();
- if (!forceMajor) {
- // If there are expired files, only select them so that compaction deletes them
- if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
- CompactSelection expiredSelection = selectExpiredStoreFiles(
- candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
- if (expiredSelection != null) {
- return expiredSelection;
- }
- }
- candidateSelection = skipLargeFiles(candidateSelection);
- }
-
- // Force a major compaction if this is a user-requested major compaction,
- // or if we do not have too many files to compact and this was requested
- // as a major compaction.
- // Or, if there are any references among the candidates.
- boolean majorCompaction = (
- (forceMajor && isUserCompaction)
- || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
- && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
- || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
- );
-
- if (!majorCompaction) {
- // we're doing a minor compaction, let's see what files are applicable
- candidateSelection = filterBulk(candidateSelection);
- candidateSelection = applyCompactionPolicy(candidateSelection);
- candidateSelection = checkMinFilesCriteria(candidateSelection);
- }
- candidateSelection =
- removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
- return candidateSelection;
- }
+ public abstract CompactSelection selectCompaction(
+ final List<StoreFile> candidateFiles, final boolean isUserCompaction,
+ final boolean forceMajor) throws IOException;
/**
- * Updates the compaction configuration. Used for tests.
- * TODO: replace when HBASE-3909 is completed in some form.
+ * @param filesToCompact Files to compact. Can be null.
+ * @return True if we should run a major compaction.
*/
- public void updateConfiguration(Configuration configuration,
- StoreConfiguration storeConfig) {
- this.comConf = new CompactionConfiguration(configuration, storeConfig);
- this.storeConfig = storeConfig;
- }
+ public abstract boolean isMajorCompaction(
+ final List<StoreFile> filesToCompact) throws IOException;
/**
- * Select the expired store files to compact
- *
- * @param candidates the initial set of storeFiles
- * @param maxExpiredTimeStamp
- * The store file will be marked as expired if its max time stamp is
- * less than this maxExpiredTimeStamp.
- * @return A CompactSelection contains the expired store files as
- * filesToCompact
+ * @param compactionSize Total size of some compaction
+ * @return whether this should be a large or small compaction
*/
- private CompactSelection selectExpiredStoreFiles(
- CompactSelection candidates, long maxExpiredTimeStamp) {
- List<StoreFile> filesToCompact = candidates.getFilesToCompact();
- if (filesToCompact == null || filesToCompact.size() == 0)
- return null;
- ArrayList<StoreFile> expiredStoreFiles = null;
- boolean hasExpiredStoreFiles = false;
- CompactSelection expiredSFSelection = null;
-
- for (StoreFile storeFile : filesToCompact) {
- if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
- LOG.info("Deleting the expired store file by compaction: "
- + storeFile.getPath() + " whose maxTimeStamp is "
- + storeFile.getReader().getMaxTimestamp()
- + " while the max expired timestamp is " + maxExpiredTimeStamp);
- if (!hasExpiredStoreFiles) {
- expiredStoreFiles = new ArrayList<StoreFile>();
- hasExpiredStoreFiles = true;
- }
- expiredStoreFiles.add(storeFile);
- }
- }
-
- if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(expiredStoreFiles);
- }
- return expiredSFSelection;
- }
+ public abstract boolean throttleCompaction(long compactionSize);
/**
- * @param candidates pre-filtrate
- * @return filtered subset
- * exclude all files above maxCompactSize
- * Also save all references. We MUST compact them
+ * @param numCandidates Number of candidate store files
+ * @return whether a compactionSelection is possible
*/
- private CompactSelection skipLargeFiles(CompactSelection candidates) {
- int pos = 0;
- while (pos < candidates.getFilesToCompact().size() &&
- candidates.getFilesToCompact().get(pos).getReader().length() >
- comConf.getMaxCompactSize() &&
- !candidates.getFilesToCompact().get(pos).isReference()) {
- ++pos;
- }
- if (pos > 0) {
- LOG.debug("Some files are too large. Excluding " + pos
- + " files from compaction candidates");
- candidates.clearSubList(0, pos);
- }
- return candidates;
- }
+ public abstract boolean needsCompaction(int numCandidates);
/**
- * @param candidates pre-filtrate
- * @return filtered subset
- * exclude all bulk load files if configured
+ * Inform the policy that some configuration has been change,
+ * so cached value should be updated it any.
*/
- private CompactSelection filterBulk(CompactSelection candidates) {
- candidates.getFilesToCompact().removeAll(Collections2.filter(
- candidates.getFilesToCompact(),
- new Predicate<StoreFile>() {
- @Override
- public boolean apply(StoreFile input) {
- return input.excludeFromMinorCompaction();
- }
- }));
- return candidates;
- }
-
- /**
- * @param candidates pre-filtrate
- * @return filtered subset
- * take upto maxFilesToCompact from the start
- */
- private CompactSelection removeExcessFiles(CompactSelection candidates,
- boolean isUserCompaction, boolean isMajorCompaction) {
- int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
- if (excess > 0) {
- if (isMajorCompaction && isUserCompaction) {
- LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
- " files because of a user-requested major compaction");
- } else {
- LOG.debug("Too many admissible files. Excluding " + excess
- + " files from compaction candidates");
- candidates.clearSubList(comConf.getMaxFilesToCompact(),
- candidates.getFilesToCompact().size());
- }
+ public void updateConfiguration() {
+ if (getConf() != null && store != null) {
+ comConf = new CompactionConfiguration(getConf(), store);
}
- return candidates;
}
+
/**
- * @param candidates pre-filtrate
- * @return filtered subset
- * forget the compactionSelection if we don't have enough files
+ * Get the compactor for this policy
+ * @return the compactor for this policy
*/
- private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
- int minFiles = comConf.getMinFilesToCompact();
- if (candidates.getFilesToCompact().size() < minFiles) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Not compacting files because we only have " +
- candidates.getFilesToCompact().size() +
- " files ready for compaction. Need " + minFiles + " to initiate.");
- }
- candidates.emptyFileList();
- }
- return candidates;
+ public Compactor getCompactor() {
+ return compactor;
}
/**
- * @param candidates pre-filtrate
- * @return filtered subset
- * -- Default minor compaction selection algorithm:
- * choose CompactSelection from candidates --
- * First exclude bulk-load files if indicated in configuration.
- * Start at the oldest file and stop when you find the first file that
- * meets compaction criteria:
- * (1) a recently-flushed, small file (i.e. <= minCompactSize)
- * OR
- * (2) within the compactRatio of sum(newer_files)
- * Given normal skew, any newer files will also meet this criteria
- * <p/>
- * Additional Note:
- * If fileSizes.size() >> maxFilesToCompact, we will recurse on
- * compact(). Consider the oldest files first to avoid a
- * situation where we always compact [end-threshold,end). Then, the
- * last file becomes an aggregate of the previous compactions.
- *
- * normal skew:
- *
- * older ----> newer (increasing seqID)
- * _
- * | | _
- * | | | | _
- * --|-|- |-|- |-|---_-------_------- minCompactSize
- * | | | | | | | | _ | |
- * | | | | | | | | | | | |
- * | | | | | | | | | | | |
- */
- CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
- if (candidates.getFilesToCompact().isEmpty()) {
- return candidates;
- }
-
- // we're doing a minor compaction, let's see what files are applicable
- int start = 0;
- double ratio = comConf.getCompactionRatio();
- if (isOffPeakHour() && candidates.trySetOffpeak()) {
- ratio = comConf.getCompactionRatioOffPeak();
- LOG.info("Running an off-peak compaction, selection ratio = " + ratio
- + ", numOutstandingOffPeakCompactions is now "
- + CompactSelection.getNumOutStandingOffPeakCompactions());
- }
-
- // get store file sizes for incremental compacting selection.
- int countOfFiles = candidates.getFilesToCompact().size();
- long[] fileSizes = new long[countOfFiles];
- long[] sumSize = new long[countOfFiles];
- for (int i = countOfFiles - 1; i >= 0; --i) {
- StoreFile file = candidates.getFilesToCompact().get(i);
- fileSizes[i] = file.getReader().length();
- // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
- int tooFar = i + comConf.getMaxFilesToCompact() - 1;
- sumSize[i] = fileSizes[i]
- + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
- }
-
-
- while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
- fileSizes[start] > Math.max(comConf.getMinCompactSize(),
- (long) (sumSize[start + 1] * ratio))) {
- ++start;
- }
- if (start < countOfFiles) {
- LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
- + " files from " + countOfFiles + " candidates");
- }
-
- candidates = candidates.getSubList(start, countOfFiles);
-
- return candidates;
- }
-
- /*
- * @param filesToCompact Files to compact. Can be null.
- * @return True if we should run a major compaction.
+ * Set the new configuration
*/
- public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
- throws IOException {
- boolean result = false;
- long mcTime = getNextMajorCompactTime(filesToCompact);
- if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
- return result;
- }
- // TODO: Use better method for determining stamp of last major (HBASE-2990)
- long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
- long now = System.currentTimeMillis();
- if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
- // Major compaction time has elapsed.
- long cfTtl = this.storeConfig.getStoreFileTtl();
- if (filesToCompact.size() == 1) {
- // Single file
- StoreFile sf = filesToCompact.get(0);
- Long minTimestamp = sf.getMinimumTimestamp();
- long oldest = (minTimestamp == null)
- ? Long.MIN_VALUE
- : now - minTimestamp.longValue();
- if (sf.isMajorCompaction() &&
- (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
- " because one (major) compacted file only and oldestTime " +
- oldest + "ms is < ttl=" + cfTtl);
- }
- } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
- LOG.debug("Major compaction triggered on store " + this +
- ", because keyvalues outdated; time since last major compaction " +
- (now - lowTimestamp) + "ms");
- result = true;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this +
- "; time since last major compaction " + (now - lowTimestamp) + "ms");
- }
- result = true;
- }
- }
- return result;
- }
-
- public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
- // default = 24hrs
- long ret = comConf.getMajorCompactionPeriod();
- if (ret > 0) {
- // default = 20% = +/- 4.8 hrs
- double jitterPct = comConf.getMajorCompactionJitter();
- if (jitterPct > 0) {
- long jitter = Math.round(ret * jitterPct);
- // deterministic jitter avoids a major compaction storm on restart
- Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
- if (seed != null) {
- double rnd = (new Random(seed)).nextDouble();
- ret += jitter - Math.round(2L * jitter * rnd);
- } else {
- ret = 0; // no storefiles == no major compaction
- }
- }
- }
- return ret;
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ updateConfiguration();
}
/**
- * @param compactionSize Total size of some compaction
- * @return whether this should be a large or small compaction
+ * Upon construction, this method will be called with the HStore
+ * to be governed. It will be called once and only once.
*/
- public boolean throttleCompaction(long compactionSize) {
- return compactionSize > comConf.getThrottlePoint();
+ protected void configureForStore(HStore store) {
+ this.store = store;
+ updateConfiguration();
}
/**
- * @param numCandidates Number of candidate store files
- * @return whether a compactionSelection is possible
+ * Create the CompactionPolicy configured for the given HStore.
+ * @param store
+ * @param conf
+ * @return a CompactionPolicy
+ * @throws IOException
*/
- public boolean needsCompaction(int numCandidates) {
- return numCandidates > comConf.getMinFilesToCompact();
+ public static CompactionPolicy create(HStore store,
+ Configuration conf) throws IOException {
+ Class<? extends CompactionPolicy> clazz =
+ getCompactionPolicyClass(store.getFamily(), conf);
+ CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+ policy.configureForStore(store);
+ return policy;
}
- /**
- * @return whether this is off-peak hour
- */
- private boolean isOffPeakHour() {
- int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
- int startHour = comConf.getOffPeakStartHour();
- int endHour = comConf.getOffPeakEndHour();
- // If offpeak time checking is disabled just return false.
- if (startHour == endHour) {
- return false;
- }
- if (startHour < endHour) {
- return (currentHour >= startHour && currentHour < endHour);
+ static Class<? extends CompactionPolicy> getCompactionPolicyClass(
+ HColumnDescriptor family, Configuration conf) throws IOException {
+ String className = conf.get(COMPACTION_POLICY_KEY,
+ DEFAULT_COMPACTION_POLICY_CLASS.getName());
+
+ try {
+ Class<? extends CompactionPolicy> clazz =
+ Class.forName(className).asSubclass(CompactionPolicy.class);
+ return clazz;
+ } catch (Exception e) {
+ throw new IOException(
+ "Unable to load configured region compaction policy '"
+ + className + "' for column '" + family.getNameAsString()
+ + "'", e);
}
- return (currentHour >= startHour || currentHour < endHour);
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed Feb 13 20:58:23 2013
@@ -89,7 +89,7 @@ public class CompactionRequest implement
* Find out if a given region is in compaction now.
*
* @param regionId
- * @return
+ * @return a CompactionState
*/
public static CompactionState getCompactionState(
final long regionId) {
@@ -174,6 +174,11 @@ public class CompactionRequest implement
return this.hashCode() - request.hashCode();
}
+ @Override
+ public boolean equals(Object obj) {
+ return (this == obj);
+ }
+
/** Gets the HRegion for the request */
public HRegion getHRegion() {
return r;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java Wed Feb 13 20:58:23 2013
@@ -126,13 +126,21 @@ public class CloseRegionHandler extends
// Check that this region is being served here
HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
if (region == null) {
- LOG.warn("Received CLOSE for region " + name +
- " but currently not serving");
+ LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
+ if (zk){
+ LOG.error("The znode is not modified as we are not serving " + name);
+ }
+ // TODO: do better than a simple warning
return;
}
// Close the region
try {
+ if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
+ // bad znode state
+ return; // We're node deleting the znode, but it's not ours...
+ }
+
// TODO: If we need to keep updating CLOSING stamp to prevent against
// a timeout if this is long-running, need to spin up a thread?
if (region.close(abort) == null) {
@@ -152,7 +160,7 @@ public class CloseRegionHandler extends
throw new RuntimeException(t);
}
- this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination);
+ this.rsServices.removeFromOnlineRegions(region, destination);
if (this.zk) {
if (setClosedState(this.expectedVersion, region)) {