You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 21:58:10 UTC
[01/11] hbase git commit: HBASE-21560 Return a new TableDescriptor
for MasterObserver#preModifyTable to allow coprocessor modify the
TableDescriptor [Forced Update!]
Repository: hbase
Updated Branches:
refs/heads/HBASE-20952 fb59426b7 -> ebfc04d85 (forced update)
HBASE-21560 Return a new TableDescriptor for MasterObserver#preModifyTable to allow coprocessor modify the TableDescriptor
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79d90c87
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79d90c87
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79d90c87
Branch: refs/heads/HBASE-20952
Commit: 79d90c87b5bc6d4aa50e6edc52a3f20da708ee29
Parents: 8d7061a
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Dec 7 16:51:19 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sat Dec 8 09:28:14 2018 +0800
----------------------------------------------------------------------
.../hbase/coprocessor/MasterObserver.java | 6 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 11 +-
.../hbase/master/MasterCoprocessorHost.java | 22 ++--
.../hbase/security/access/AccessController.java | 9 +-
.../CoprocessorWhitelistMasterObserver.java | 5 +-
.../visibility/VisibilityController.java | 15 ++-
.../hbase/coprocessor/TestMasterObserver.java | 3 +-
.../TestMasterObserverToModifyTableSchema.java | 128 +++++++++++++++++++
8 files changed, 169 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index a0863e4..1a8db79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -240,11 +240,13 @@ public interface MasterObserver {
* @param currentDescriptor current TableDescriptor of the table
* @param newDescriptor after modify operation, table will have this descriptor
*/
- default void preModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ default TableDescriptor preModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
- throws IOException {
+ throws IOException {
preModifyTable(ctx, tableName, newDescriptor);
+ return newDescriptor;
}
+
/**
* Called after the modifyTable operation has been requested. Called as part
* of modify table RPC call.
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e96dc36..a16e09d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -2631,13 +2631,12 @@ public class HMaster extends HRegionServer implements MasterServices {
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
- TableDescriptor newDescriptor = newDescriptorGetter.get();
- sanityCheckTableDescriptor(newDescriptor);
TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
- getMaster().getMasterCoprocessorHost().preModifyTable(tableName, oldDescriptor,
- newDescriptor);
-
- LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
+ TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
+ .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
+ sanityCheckTableDescriptor(newDescriptor);
+ LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
+ oldDescriptor, newDescriptor);
// Execute the operation synchronously - wait for the operation completes before
// continuing.
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 51e30c4..e7b166c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -446,14 +446,20 @@ public class MasterCoprocessorHost
});
}
- public void preModifyTable(final TableName tableName, final TableDescriptor currentDescriptor,
- final TableDescriptor newDescriptor) throws IOException {
- execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
- @Override
- public void call(MasterObserver observer) throws IOException {
- observer.preModifyTable(this, tableName, currentDescriptor, newDescriptor);
- }
- });
+ public TableDescriptor preModifyTable(final TableName tableName,
+ final TableDescriptor currentDescriptor, final TableDescriptor newDescriptor)
+ throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return newDescriptor;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<MasterObserver, TableDescriptor>(masterObserverGetter,
+ newDescriptor) {
+ @Override
+ protected TableDescriptor call(MasterObserver observer) throws IOException {
+ return observer.preModifyTable(this, tableName, currentDescriptor, getResult());
+ }
+ });
}
public void postModifyTable(final TableName tableName, final TableDescriptor oldDescriptor,
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 835fc0d..82ec12d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -970,11 +970,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
@Override
- public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
- TableDescriptor currentDesc, TableDescriptor newDesc) throws IOException {
+ public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
+ TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
+ throws IOException {
// TODO: potentially check if this is a add/modify/delete column operation
- requirePermission(c, "modifyTable",
- tableName, null, null, Action.ADMIN, Action.CREATE);
+ requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
+ return newDesc;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
index 719fe33..1e83e96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
@@ -54,10 +54,11 @@ public class CoprocessorWhitelistMasterObserver implements MasterCoprocessor, Ma
}
@Override
- public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
- throws IOException {
+ throws IOException {
verifyCoprocessors(ctx, newDesc);
+ return newDesc;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 1f54afc..c4f3b95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -226,14 +226,15 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
}
@Override
- public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName,
- TableDescriptor currentDescriptor, TableDescriptor newDescriptor) throws IOException {
- if (!authorizationEnabled) {
- return;
- }
- if (LABELS_TABLE_NAME.equals(tableName)) {
- throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
+ public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
+ throws IOException {
+ if (authorizationEnabled) {
+ if (LABELS_TABLE_NAME.equals(tableName)) {
+ throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
+ }
}
+ return newDescriptor;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index d8a5b4c..58f4b9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -373,10 +373,11 @@ public class TestMasterObserver {
}
@Override
- public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName, final TableDescriptor currentDescriptor,
final TableDescriptor newDescriptor) throws IOException {
preModifyTableCalled = true;
+ return newDescriptor;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
new file mode 100644
index 0000000..d23a4a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestMasterObserverToModifyTableSchema {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMasterObserverToModifyTableSchema.class);
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static TableName TABLENAME = TableName.valueOf("TestTable");
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ OnlyOneVersionAllowedMasterObserver.class.getName());
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMasterObserverToModifyTableSchema() throws IOException {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
+ for (int i = 1; i <= 3; i++) {
+ builder.setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf" + i)).setMaxVersions(i)
+ .build());
+ }
+ try (Admin admin = UTIL.getAdmin()) {
+ admin.createTable(builder.build());
+ assertOneVersion(admin.getDescriptor(TABLENAME));
+
+ builder.modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf1"))
+ .setMaxVersions(Integer.MAX_VALUE).build());
+ admin.modifyTable(builder.build());
+ assertOneVersion(admin.getDescriptor(TABLENAME));
+ }
+ }
+
+ private void assertOneVersion(TableDescriptor td) {
+ for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+ assertEquals(1, cfd.getMaxVersions());
+ }
+ }
+
+ public static class OnlyOneVersionAllowedMasterObserver
+ implements MasterCoprocessor, MasterObserver {
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public TableDescriptor preCreateTableRegionsInfos(
+ ObserverContext<MasterCoprocessorEnvironment> ctx, TableDescriptor desc)
+ throws IOException {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
+ for (ColumnFamilyDescriptor cfd : desc.getColumnFamilies()) {
+ builder.modifyColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(cfd).setMaxVersions(1).build());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ TableName tableName, final TableDescriptor currentDescriptor,
+ final TableDescriptor newDescriptor) throws IOException {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(newDescriptor);
+ for (ColumnFamilyDescriptor cfd : newDescriptor.getColumnFamilies()) {
+ builder.modifyColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(cfd).setMaxVersions(1).build());
+ }
+ return builder.build();
+ }
+ }
+}
[07/11] hbase git commit: HBASE-21582 If call
HBaseAdmin#snapshotAsync but forget call isSnapshotFinished,
then SnapshotHFileCleaner will skip to run every time
Posted by el...@apache.org.
HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f32d2618
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f32d2618
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f32d2618
Branch: refs/heads/HBASE-20952
Commit: f32d2618430f70e1b0db92785294b2c7892cc02b
Parents: 4640ff5
Author: huzheng <op...@gmail.com>
Authored: Tue Dec 11 20:27:56 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 13 10:35:20 2018 +0800
----------------------------------------------------------------------
.../hbase/master/snapshot/SnapshotManager.java | 48 ++++++++++++++------
.../master/cleaner/TestSnapshotFromMaster.java | 27 ++++++++++-
.../master/snapshot/TestSnapshotManager.java | 36 +++++++++++++--
3 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 2b963b2..05db4ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -28,7 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -91,6 +95,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringP
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class manages the procedure of taking and restoring snapshots. There is only one
@@ -120,7 +126,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* At this point, if the user asks for the snapshot/restore status, the result will be
* snapshot done if exists or failed if it doesn't exists.
*/
- private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
+ public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
+ "hbase.snapshot.sentinels.cleanup.timeoutMillis";
+ public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
/** Enable or disable snapshot support */
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
@@ -151,7 +159,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// The map is always accessed and modified under the object lock using synchronized.
// snapshotTable() will insert an Handler in the table.
// isSnapshotDone() will remove the handler requested if the operation is finished.
- private Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
+ private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService scheduleThreadPool =
+ Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
+ private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
// Restore map, with table name as key, procedure ID as value.
// The map is always accessed and modified under the object lock using synchronized.
@@ -181,17 +193,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* @param coordinator procedure coordinator instance. exposed for testing.
* @param pool HBase ExecutorServcie instance, exposed for testing.
*/
- public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
- ProcedureCoordinator coordinator, ExecutorService pool)
+ @VisibleForTesting
+ SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
+ ExecutorService pool, int sentinelCleanInterval)
throws IOException, UnsupportedOperationException {
this.master = master;
this.rootDir = master.getMasterFileSystem().getRootDir();
- checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+ Configuration conf = master.getConfiguration();
+ checkSnapshotSupport(conf, master.getMasterFileSystem());
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
+ snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
+ this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
}
/**
@@ -274,7 +290,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
*
* @throws IOException if we can't reach the filesystem
*/
- void resetTempDir() throws IOException {
+ private void resetTempDir() throws IOException {
// cleanup any existing snapshots.
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
master.getConfiguration());
@@ -290,7 +306,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
* @throws IOException For filesystem IOExceptions
*/
- public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
+ public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
// check to see if it is completed
if (!isSnapshotCompleted(snapshot)) {
throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
@@ -934,7 +950,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
this.restoreTableToProcIdMap.remove(tableName);
return false;
}
-
}
/**
@@ -989,14 +1004,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
*/
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
long currentTime = EnvironmentEdgeManager.currentTime();
- Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
- sentinels.entrySet().iterator();
+ long sentinelsCleanupTimeoutMillis =
+ master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
+ SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
+ Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
- if (sentinel.isFinished() &&
- (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
- {
+ if (sentinel.isFinished()
+ && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
it.remove();
}
}
@@ -1031,7 +1047,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
snapshotHandler.cancel(why);
}
-
+ if (snapshotHandlerChoreCleanerTask != null) {
+ snapshotHandlerChoreCleanerTask.cancel(true);
+ }
try {
if (coordinator != null) {
coordinator.close();
@@ -1166,6 +1184,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
+ snapshotHandlerChoreCleanerTask =
+ scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 9d76ede..cc2ee06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -25,6 +26,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,8 +35,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.SnapshotType;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
@@ -129,11 +136,11 @@ public class TestSnapshotFromMaster {
conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, "");
// Enable snapshot
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L);
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName());
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
-
}
@Before
@@ -419,4 +426,22 @@ public class TestSnapshotFromMaster {
builder.commit();
return builder.getSnapshotDescription();
}
+
+ @Test
+ public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
+ // Write some data
+ Table table = UTIL.getConnection().getTable(TABLE_NAME);
+ for (int i = 0; i < 10; i++) {
+ Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
+ table.put(put);
+ }
+ String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
+ UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription(
+ snapshotName, TABLE_NAME, SnapshotType.FLUSH));
+ Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L,
+ () -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1);
+ assertTrue(master.getSnapshotManager().isTakingAnySnapshot());
+ Thread.sleep(11 * 1000L);
+ assertFalse(master.getSnapshotManager().isTakingAnySnapshot());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
index 3a6a61f..ff903c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
@@ -62,7 +61,6 @@ public class TestSnapshotManager {
public TestName name = new TestName();
MasterServices services = Mockito.mock(MasterServices.class);
- MetricsMaster metrics = Mockito.mock(MetricsMaster.class);
ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
ExecutorService pool = Mockito.mock(ExecutorService.class);
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
@@ -79,14 +77,44 @@ public class TestSnapshotManager {
return getNewManager(UTIL.getConfiguration());
}
- private SnapshotManager getNewManager(final Configuration conf)
+ private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException {
+ return getNewManager(conf, 1);
+ }
+
+ private SnapshotManager getNewManager(Configuration conf, int intervalSeconds)
throws IOException, KeeperException {
Mockito.reset(services);
Mockito.when(services.getConfiguration()).thenReturn(conf);
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
- return new SnapshotManager(services, metrics, coordinator, pool);
+ return new SnapshotManager(services, coordinator, pool, intervalSeconds);
+ }
+
+ @Test
+ public void testCleanFinishedHandler() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ Configuration conf = UTIL.getConfiguration();
+ try {
+ conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L);
+ SnapshotManager manager = getNewManager(conf, 1);
+ TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
+ assertFalse("Manager is in process when there is no current handler",
+ manager.isTakingSnapshot(tableName));
+ manager.setSnapshotHandlerForTesting(tableName, handler);
+ Mockito.when(handler.isFinished()).thenReturn(false);
+ assertTrue(manager.isTakingAnySnapshot());
+ assertTrue("Manager isn't in process when handler is running",
+ manager.isTakingSnapshot(tableName));
+ Mockito.when(handler.isFinished()).thenReturn(true);
+ assertFalse("Manager is process when handler isn't running",
+ manager.isTakingSnapshot(tableName));
+ assertTrue(manager.isTakingAnySnapshot());
+ Thread.sleep(6 * 1000);
+ assertFalse(manager.isTakingAnySnapshot());
+ } finally {
+ conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS);
+ }
}
@Test
[04/11] hbase git commit: HBASE-21453 Convert ReadOnlyZKClient to
DEBUG instead of INFO
Posted by el...@apache.org.
HBASE-21453 Convert ReadOnlyZKClient to DEBUG instead of INFO
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f88224ee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f88224ee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f88224ee
Branch: refs/heads/HBASE-20952
Commit: f88224ee34ba2c23f794ec1219ffd93783b20e51
Parents: b09b87d
Author: Sakthi <ja...@cloudera.com>
Authored: Thu Nov 29 18:52:50 2018 -0800
Committer: Peter Somogyi <ps...@apache.org>
Committed: Tue Dec 11 08:18:02 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f88224ee/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index fc2d5f0..09f8984 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -136,7 +136,7 @@ public final class ReadOnlyZKClient implements Closeable {
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
- LOG.info(
+ LOG.debug(
"Connect {} to {} with session timeout={}ms, retries {}, " +
"retry interval {}ms, keepAlive={}ms",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
@@ -347,7 +347,7 @@ public final class ReadOnlyZKClient implements Closeable {
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
+ LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
tasks.add(CLOSE);
}
}
[05/11] hbase git commit: HBASE-21568 Use CacheConfig.DISABLED where
we don't expect to have blockcache running
Posted by el...@apache.org.
HBASE-21568 Use CacheConfig.DISABLED where we don't expect to have blockcache running
This includes removing the "old way" of disabling blockcache in favor of the
new API.
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67d6d508
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67d6d508
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67d6d508
Branch: refs/heads/HBASE-20952
Commit: 67d6d5084cf8fc094cda4bd3f091d8a0a9cb1d3e
Parents: f88224e
Author: Josh Elser <el...@apache.org>
Authored: Fri Dec 7 17:18:49 2018 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 11 10:02:18 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java | 6 ++----
.../src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java | 4 +---
.../org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java | 2 +-
.../org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java | 6 +++---
.../java/org/apache/hadoop/hbase/util/CompressionTest.java | 2 +-
.../src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java | 5 ++---
.../apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java | 2 +-
7 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index c911e8c..274a506 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -414,8 +414,6 @@ public class HFileOutputFormat2
DataBlockEncoding encoding = overriddenEncoding;
encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
HFileContextBuilder contextBuilder = new HFileContextBuilder()
.withCompression(compression)
.withChecksumType(HStore.getChecksumType(conf))
@@ -430,12 +428,12 @@ public class HFileOutputFormat2
HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) {
wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+ new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
} else {
wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+ new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 5bcaa17..78ebedc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -356,9 +356,7 @@ public class HFile {
*/
public static final WriterFactory getWriterFactoryNoCache(Configuration
conf) {
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
+ return HFile.getWriterFactory(conf, CacheConfig.DISABLED);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 82e881b..5a6f6c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -309,7 +309,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
return -2;
}
- HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf());
+ HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, getConf());
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index e027ac6..3320b1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -710,7 +710,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
Path hfilePath = item.getFilePath();
Optional<byte[]> first, last;
try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
- new CacheConfig(getConf()), true, getConf())) {
+ CacheConfig.DISABLED, true, getConf())) {
hfr.loadFileInfo();
first = hfr.getFirstRowKey();
last = hfr.getLastRowKey();
@@ -847,7 +847,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws IOException {
Path hfile = hfileStatus.getPath();
try (HFile.Reader reader =
- HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
+ HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
builder.setCompressionType(reader.getFileContext().getCompression());
LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
@@ -1083,7 +1083,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
FileSystem fs = inFile.getFileSystem(conf);
- CacheConfig cacheConf = new CacheConfig(conf);
+ CacheConfig cacheConf = CacheConfig.DISABLED;
HalfStoreFileReader halfReader = null;
StoreFileWriter halfWriter = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index b6af8a5..dcdd12e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -134,7 +134,7 @@ public class CompressionTest {
writer.appendFileInfo(Bytes.toBytes("compressioninfokey"), Bytes.toBytes("compressioninfoval"));
writer.close();
Cell cc = null;
- HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
+ HFile.Reader reader = HFile.createReader(fs, path, CacheConfig.DISABLED, true, conf);
try {
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 14706c5..8176942 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -922,7 +922,7 @@ public class HBaseFsck extends Configured implements Closeable {
// For all the stores in this column family.
for (FileStatus storeFile : storeFiles) {
HFile.Reader reader = HFile.createReader(fs, storeFile.getPath(),
- new CacheConfig(getConf()), true, getConf());
+ CacheConfig.DISABLED, true, getConf());
if ((reader.getFirstKey() != null)
&& ((storeFirstKey == null) || (comparator.compare(storeFirstKey,
((KeyValue.KeyOnlyKeyValue) reader.getFirstKey().get()).getKey()) > 0))) {
@@ -1025,8 +1025,7 @@ public class HBaseFsck extends Configured implements Closeable {
byte[] start, end;
HFile.Reader hf = null;
try {
- CacheConfig cacheConf = new CacheConfig(getConf());
- hf = HFile.createReader(fs, hfile.getPath(), cacheConf, true, getConf());
+ hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED, true, getConf());
hf.loadFileInfo();
Optional<Cell> startKv = hf.getFirstKey();
start = CellUtil.cloneRow(startKv.get());
http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
index e937fa5..41f3cde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
@@ -82,7 +82,7 @@ public class HFileCorruptionChecker {
boolean quarantine) throws IOException {
this.conf = conf;
this.fs = FileSystem.get(conf);
- this.cacheConf = new CacheConfig(conf);
+ this.cacheConf = CacheConfig.DISABLED;
this.executor = executor;
this.inQuarantineMode = quarantine;
}
[11/11] hbase git commit: HBASE-20952 run the extended tests weekly
until branch activity picks up.
Posted by el...@apache.org.
HBASE-20952 run the extended tests weekly until branch activity picks up.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ebfc04d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ebfc04d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ebfc04d8
Branch: refs/heads/HBASE-20952
Commit: ebfc04d85e44d5af75afa68789ef1ae5e3b0ed35
Parents: 3ff274e
Author: Sean Busbey <bu...@apache.org>
Authored: Fri Nov 16 07:51:08 2018 -0600
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 13 16:57:55 2018 -0500
----------------------------------------------------------------------
dev-support/Jenkinsfile | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ebfc04d8/dev-support/Jenkinsfile
----------------------------------------------------------------------
diff --git a/dev-support/Jenkinsfile b/dev-support/Jenkinsfile
index b333afb..bea425a 100644
--- a/dev-support/Jenkinsfile
+++ b/dev-support/Jenkinsfile
@@ -21,7 +21,7 @@ pipeline {
}
}
triggers {
- cron('@daily')
+ cron('@weekly')
}
options {
buildDiscarder(logRotator(numToKeepStr: '30'))
[02/11] hbase git commit: HBASE-21567 Allow overriding configs
starting up the shell
Posted by el...@apache.org.
HBASE-21567 Allow overriding configs starting up the shell
Adds support for -D as option to 'hbase shell'
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da9508d4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da9508d4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da9508d4
Branch: refs/heads/HBASE-20952
Commit: da9508d4271ea12410e289692f10791b0e05266b
Parents: 79d90c8
Author: stack <st...@apache.org>
Authored: Thu Dec 6 23:05:21 2018 -0800
Committer: stack <st...@apache.org>
Committed: Sat Dec 8 15:08:19 2018 -0800
----------------------------------------------------------------------
bin/hirb.rb | 40 ++++++++++++++++++++++++-----
src/main/asciidoc/_chapters/shell.adoc | 16 ++++++++++++
2 files changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/da9508d4/bin/hirb.rb
----------------------------------------------------------------------
diff --git a/bin/hirb.rb b/bin/hirb.rb
index 790ecdc..e857db7 100644
--- a/bin/hirb.rb
+++ b/bin/hirb.rb
@@ -54,21 +54,47 @@ $LOAD_PATH.unshift Pathname.new(sources)
cmdline_help = <<HERE # HERE document output as shell usage
Usage: shell [OPTIONS] [SCRIPTFILE [ARGUMENTS]]
- -d | --debug Set DEBUG log levels.
- -h | --help This help.
- -n | --noninteractive Do not run within an IRB session
- and exit with non-zero status on
- first error.
+ -d | --debug Set DEBUG log levels.
+ -h | --help This help.
+ -n | --noninteractive Do not run within an IRB session and exit with non-zero
+ status on first error.
+ -Dkey=value Pass hbase-*.xml Configuration overrides. For example, to
+ use an alternate zookeeper ensemble, pass:
+ -Dhbase.zookeeper.quorum=zookeeper.example.org
+ For faster fail, pass the below and vary the values:
+ -Dhbase.client.retries.number=7
+ -Dhbase.ipc.client.connect.max.retries=3
HERE
+
+# Takes configuration and an arg that is expected to be key=value format.
+# If c is empty, creates one and returns it
+def add_to_configuration(c, arg)
+ kv = arg.split('=')
+ kv.length == 2 || (raise "Expected parameter #{kv} in key=value format")
+ c = org.apache.hadoop.hbase.HBaseConfiguration.create if c.nil?
+ c.set(kv[0], kv[1])
+ c
+end
+
found = []
script2run = nil
log_level = org.apache.log4j.Level::ERROR
@shell_debug = false
interactive = true
-for arg in ARGV
+_configuration = nil
+D_ARG = '-D'
+while (arg = ARGV.shift)
if arg == '-h' || arg == '--help'
puts cmdline_help
exit
+ elsif arg == D_ARG
+ argValue = ARGV.shift || (raise "#{D_ARG} takes a 'key=value' parameter")
+ _configuration = add_to_configuration(_configuration, argValue)
+ found.push(arg)
+ found.push(argValue)
+ elsif arg.start_with? D_ARG
+ _configuration = add_to_configuration(_configuration, arg[2..-1])
+ found.push(arg)
elsif arg == '-d' || arg == '--debug'
log_level = org.apache.log4j.Level::DEBUG
$fullBackTrace = true
@@ -111,7 +137,7 @@ require 'shell'
require 'shell/formatter'
# Setup the HBase module. Create a configuration.
-@hbase = Hbase::Hbase.new
+@hbase = _configuration.nil? ? Hbase::Hbase.new : Hbase::Hbase.new(_configuration)
# Setup console
@shell = Shell::Shell.new(@hbase, interactive)
http://git-wip-us.apache.org/repos/asf/hbase/blob/da9508d4/src/main/asciidoc/_chapters/shell.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/shell.adoc b/src/main/asciidoc/_chapters/shell.adoc
index 5612e1d..cdfa828 100644
--- a/src/main/asciidoc/_chapters/shell.adoc
+++ b/src/main/asciidoc/_chapters/shell.adoc
@@ -58,6 +58,7 @@ To run one of these files, do as follows:
$ ./bin/hbase org.jruby.Main PATH_TO_SCRIPT
----
+
== Running the Shell in Non-Interactive Mode
A new non-interactive mode has been added to the HBase Shell (link:https://issues.apache.org/jira/browse/HBASE-11658[HBASE-11658)].
@@ -213,6 +214,21 @@ $ HBASE_SHELL_OPTS="-verbose:gc -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCD
-XX:+PrintGCDetails -Xloggc:$HBASE_HOME/logs/gc-hbase.log" ./bin/hbase shell
----
+== Overriding configuration starting the HBase Shell
+
+As of hbase-2.0.5/hbase-2.1.3/hbase-2.2.0/hbase-1.4.10/hbase-1.5.0, you can
+pass or override hbase configuration as specified in `hbase-*.xml` by passing
+your key/values prefixed with `-D` on the command-line as follows:
+[source,bash]
+----
+$ ./bin/hbase shell -Dhbase.zookeeper.quorum=ZK0.remote.cluster.example.org,ZK1.remote.cluster.example.org,ZK2.remote.cluster.example.org -Draining=false
+...
+hbase(main):001:0> @shell.hbase.configuration.get("hbase.zookeeper.quorum")
+=> "ZK0.remote.cluster.example.org,ZK1.remote.cluster.example.org,ZK2.remote.cluster.example.org"
+hbase(main):002:0> @shell.hbase.configuration.get("raining")
+=> "false"
+----
+
== Shell Tricks
=== Table variables
[06/11] hbase git commit: HIVE-21575 : memstore above high watermark
message is logged too much
Posted by el...@apache.org.
HIVE-21575 : memstore above high watermark message is logged too much
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4640ff59
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4640ff59
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4640ff59
Branch: refs/heads/HBASE-20952
Commit: 4640ff5959af4865966126a503a7cd15e26a7408
Parents: 67d6d50
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 12 11:02:25 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 12 11:02:25 2018 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/regionserver/MemStoreFlusher.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4640ff59/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 699c9b6..804a2f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,6 +703,7 @@ class MemStoreFlusher implements FlushRequester {
if (flushType != FlushType.NORMAL) {
TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
long start = EnvironmentEdgeManager.currentTime();
+ long nextLogTimeMs = start;
synchronized (this.blockSignal) {
boolean blocked = false;
long startTime = 0;
@@ -744,8 +745,11 @@ class MemStoreFlusher implements FlushRequester {
LOG.warn("Interrupted while waiting");
interrupted = true;
}
- long took = EnvironmentEdgeManager.currentTime() - start;
- LOG.warn("Memstore is above high water mark and block " + took + "ms");
+ long nowMs = EnvironmentEdgeManager.currentTime();
+ if (nowMs >= nextLogTimeMs) {
+ LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+ nextLogTimeMs = nowMs + 1000;
+ }
flushType = isAboveHighWaterMark();
}
} finally {
[03/11] hbase git commit: HBASE-21570 Add write buffer periodic flush
support for AsyncBufferedMutator
Posted by el...@apache.org.
HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b09b87d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b09b87d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b09b87d1
Branch: refs/heads/HBASE-20952
Commit: b09b87d143730db00ec56114a752d3a74f8982c4
Parents: da9508d
Author: zhangduo <zh...@apache.org>
Authored: Tue Dec 11 08:39:43 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Dec 11 14:51:26 2018 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncBufferedMutator.java | 16 +-
.../client/AsyncBufferedMutatorBuilder.java | 19 +++
.../client/AsyncBufferedMutatorBuilderImpl.java | 19 ++-
.../hbase/client/AsyncBufferedMutatorImpl.java | 67 +++++---
.../client/AsyncConnectionConfiguration.java | 37 +++--
.../hbase/client/AsyncConnectionImpl.java | 11 +-
.../hbase/client/TestAsyncBufferMutator.java | 161 ++++++++++++++++++-
7 files changed, 277 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6fe4b9a..7b21eb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
/**
* Used to communicate with a single HBase table in batches. Obtain an instance from a
* {@link AsyncConnection} and call {@link #close()} afterwards.
@@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
* @param mutation The data to send.
*/
- CompletableFuture<Void> mutate(Mutation mutation);
+ default CompletableFuture<Void> mutate(Mutation mutation) {
+ return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
+ }
/**
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
@@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
* @return The size of the write buffer in bytes.
*/
long getWriteBufferSize();
+
+ /**
+ * Returns the periodical flush interval, 0 means disabled.
+ */
+ default long getPeriodicalFlushTimeout(TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index 45959bb..c617c8e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -46,6 +46,25 @@ public interface AsyncBufferedMutatorBuilder {
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
/**
+ * Set the periodical flush interval. If the data in the buffer has not been flush for a long
+ * time, i.e, reach this timeout limit, we will flush it automatically.
+ * <p/>
+ * Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
+ * 'flush immediately'. If you want to flush immediately then you should not use this class, as it
+ * is designed to be 'buffered'.
+ */
+ default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Disable the periodical flush, i.e, set the timeout to 0.
+ */
+ default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
+ return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 227d02b..eb8af17 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
/**
* The implementation of {@link AsyncBufferedMutatorBuilder}.
@@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
+ private final HashedWheelTimer periodicalFlushTimer;
+
private final AsyncTableBuilder<?> tableBuilder;
private long writeBufferSize;
+ private long periodicFlushTimeoutNs;
+
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
- AsyncTableBuilder<?> tableBuilder) {
+ AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
+ this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
+ this.periodicalFlushTimer = periodicalFlushTimer;
}
@Override
@@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
}
@Override
- public AsyncBufferedMutator build() {
- return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
+ public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+ this.periodicFlushTimeoutNs = unit.toNanos(timeout);
+ return this;
}
+ @Override
+ public AsyncBufferedMutator build() {
+ return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
+ periodicFlushTimeoutNs);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 5a92ace..318c6c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
/**
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
*/
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
+ private final HashedWheelTimer periodicalFlushTimer;
+
private final AsyncTable<?> table;
private final long writeBufferSize;
+ private final long periodicFlushTimeoutNs;
+
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private boolean closed;
- AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
+ @VisibleForTesting
+ Timeout periodicFlushTask;
+
+ AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+ long writeBufferSize, long periodicFlushTimeoutNs) {
+ this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
+ this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
}
@Override
@@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
return table.getConfiguration();
}
- private void internalFlush() {
+ // will be overridden in test
+ @VisibleForTesting
+ protected void internalFlush() {
+ if (periodicFlushTask != null) {
+ periodicFlushTask.cancel();
+ periodicFlushTask = null;
+ }
List<Mutation> toSend = this.mutations;
if (toSend.isEmpty()) {
return;
@@ -86,29 +107,10 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
}
@Override
- public CompletableFuture<Void> mutate(Mutation mutation) {
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- long heapSize = mutation.heapSize();
- synchronized (this) {
- if (closed) {
- future.completeExceptionally(new IOException("Already closed"));
- return future;
- }
- mutations.add(mutation);
- futures.add(future);
- bufferedSize += heapSize;
- if (bufferedSize >= writeBufferSize) {
- internalFlush();
- }
- }
- return future;
- }
-
- @Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
List<CompletableFuture<Void>> futures =
- Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
- .collect(Collectors.toList());
+ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
+ .collect(Collectors.toList());
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
synchronized (this) {
if (closed) {
@@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
+ if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
+ periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
+ synchronized (AsyncBufferedMutatorImpl.this) {
+ // confirm that we are still valid, if there is already an internalFlush call before us,
+ // then we should not execute any more. And in internalFlush we will set periodicFlush
+ // to null, and since we may schedule a new one, so here we check whether the references
+ // are equal.
+ if (timeout == periodicFlushTask) {
+ periodicFlushTask = null;
+ internalFlush();
+ }
+ }
+ }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+ }
this.mutations.addAll(mutations);
this.futures.addAll(futures);
bufferedSize += heapSize;
@@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
public long getWriteBufferSize() {
return writeBufferSize;
}
+
+ @Override
+ public long getPeriodicalFlushTimeout(TimeUnit unit) {
+ return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index bd2add8..915e9dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
@@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
private final long writeBufferSize;
+ private final long writeBufferPeriodicFlushTimeoutNs;
+
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
- this.rpcTimeoutNs = TimeUnit.MILLISECONDS
- .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+ this.rpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
this.readRpcTimeoutNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
- conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+ conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.scanTimeoutNs = TimeUnit.MILLISECONDS
- .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+ .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
this.scannerCaching =
- conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
- this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
+ conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ this.metaScannerCaching =
+ conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
- this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.writeBufferPeriodicFlushTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
return scannerCaching;
}
- int getMetaScannerCaching(){
+ int getMetaScannerCaching() {
return metaScannerCaching;
}
@@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferSize() {
return writeBufferSize;
}
+
+ long getWriteBufferPeriodicFlushTimeoutNs() {
+ return writeBufferPeriodicFlushTimeoutNs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index a05764e..078395b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
- Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
+ Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
this.masterStubMakeFuture.getAndSet(null)
- .completeExceptionally(new MasterNotRunningException(msg));
+ .completeExceptionally(new MasterNotRunningException(msg));
return;
}
try {
@@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
});
} catch (IOException e) {
this.masterStubMakeFuture.getAndSet(null)
- .completeExceptionally(new IOException("Failed to create async master stub", e));
+ .completeExceptionally(new IOException("Failed to create async master stub", e));
}
});
}
@@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
- return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
+ return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
- return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
+ return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
+ RETRY_TIMER);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 9fe4ca7..6eed326 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -31,6 +33,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -45,12 +48,15 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBufferMutator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
+ HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
- CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
+ CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
- .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
- .collect(Collectors.toList()));
+ .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+ .collect(Collectors.toList()));
// exceeded the write buffer size, a flush will be called directly
fs.forEach(f -> f.join());
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
@@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
futures.forEach(f -> f.join());
AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
- .forEach(r -> {
- assertArrayEquals(VALUE, r.getValue(CF, CQ));
- });
+ .forEach(r -> {
+ assertArrayEquals(VALUE, r.getValue(CF, CQ));
+ });
}
@Test
@@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
}
}
}
+
+ @Test
+ public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
+ try (AsyncBufferedMutator mutator =
+ CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ CompletableFuture<?> future = mutator.mutate(put);
+ Thread.sleep(2000);
+ // assert that we have not flushed it out
+ assertFalse(future.isDone());
+ mutator.flush();
+ future.get();
+ }
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ @Test
+ public void testPeriodicFlush() throws InterruptedException, ExecutionException {
+ AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ CompletableFuture<?> future = mutator.mutate(put);
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ // a bit deep into the implementation
+ @Test
+ public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
+ .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
+ .setWriteBufferSize(10 * put.heapSize()).build()) {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ futures.add(mutator.mutate(put));
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ for (int i = 1;; i++) {
+ futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
+ if (mutator.periodicFlushTask == null) {
+ break;
+ }
+ }
+ assertTrue(task.isCancelled());
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ for (int i = 0; i < futures.size(); i++) {
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
+ }
+ }
+ }
+
+ @Test
+ public void testCancelPeriodicFlushByManuallyFlush()
+ throws InterruptedException, ExecutionException {
+ try (AsyncBufferedMutatorImpl mutator =
+ (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+ CompletableFuture<?> future =
+ mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ mutator.flush();
+ assertTrue(task.isCancelled());
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+ }
+
+ @Test
+ public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
+ CompletableFuture<?> future;
+ Timeout task;
+ try (AsyncBufferedMutatorImpl mutator =
+ (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+ future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ }
+ assertTrue(task.isCancelled());
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
+
+ private int flushCount;
+
+ AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+ long writeBufferSize, long periodicFlushTimeoutNs) {
+ super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
+ }
+
+ @Override
+ protected void internalFlush() {
+ flushCount++;
+ super.internalFlush();
+ }
+ }
+
+ @Test
+ public void testRaceBetweenNormalFlushAndPeriodicFlush()
+ throws InterruptedException, ExecutionException {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ try (AsyncBufferMutatorForTest mutator =
+ new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
+ 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+ CompletableFuture<?> future = mutator.mutate(put);
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ synchronized (mutator) {
+ // synchronized on mutator to prevent periodic flush to be executed
+ Thread.sleep(500);
+ // the timeout should be issued
+ assertTrue(task.isExpired());
+ // but no flush is issued as we hold the lock
+ assertEquals(0, mutator.flushCount);
+ assertFalse(future.isDone());
+ // manually flush, then release the lock
+ mutator.flush();
+ }
+ // this is a bit deep into the implementation in netty but anyway let's add a check here to
+ // confirm that an issued timeout can not be canceled by netty framework.
+ assertFalse(task.isCancelled());
+ // and the mutation is done
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ // only the manual flush, the periodic flush should have been canceled by us
+ assertEquals(1, mutator.flushCount);
+ }
+ }
}
[08/11] hbase git commit: HBASE-21590 Optimize trySkipToNextColumn in
StoreScanner a bit.
Posted by el...@apache.org.
HBASE-21590 Optimize trySkipToNextColumn in StoreScanner a bit.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb1966dc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb1966dc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb1966dc
Branch: refs/heads/HBASE-20952
Commit: cb1966dc2d94fba10d9b6af3c5719e03f621df92
Parents: f32d261
Author: Lars Hofhansl <la...@apache.org>
Authored: Thu Dec 13 11:57:16 2018 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Thu Dec 13 11:57:16 2018 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/regionserver/StoreScanner.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb1966dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 736c08a..e7a4528 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -802,12 +802,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@VisibleForTesting
protected boolean trySkipToNextRow(Cell cell) throws IOException {
Cell nextCell = null;
+ // used to guard against a changed next indexed key by doing a identity comparison
+ // when the identity changes we need to compare the bytes again
+ Cell previousIndexedKey = null;
do {
Cell nextIndexedKey = getNextIndexedKey();
if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
- && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
+ && (nextIndexedKey == previousIndexedKey || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) {
this.heap.next();
++kvsScanned;
+ previousIndexedKey = nextIndexedKey;
} else {
return false;
}
@@ -823,12 +827,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@VisibleForTesting
protected boolean trySkipToNextColumn(Cell cell) throws IOException {
Cell nextCell = null;
+ // used to guard against a changed next indexed key by doing a identity comparison
+ // when the identity changes we need to compare the bytes again
+ Cell previousIndexedKey = null;
do {
Cell nextIndexedKey = getNextIndexedKey();
if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
- && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
+ && (nextIndexedKey == previousIndexedKey || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) {
this.heap.next();
++kvsScanned;
+ previousIndexedKey = nextIndexedKey;
} else {
return false;
}
[10/11] hbase git commit: HBASE-21575 : memstore above high watermark
message is logged too much
Posted by el...@apache.org.
HBASE-21575 : memstore above high watermark message is logged too much
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ff274e2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ff274e2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ff274e2
Branch: refs/heads/HBASE-20952
Commit: 3ff274e22eb5710f4301fb0fce364e22a11288d7
Parents: 9a25d0c
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 12 11:02:25 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 13 12:47:11 2018 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/regionserver/MemStoreFlusher.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff274e2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 699c9b6..804a2f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,6 +703,7 @@ class MemStoreFlusher implements FlushRequester {
if (flushType != FlushType.NORMAL) {
TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
long start = EnvironmentEdgeManager.currentTime();
+ long nextLogTimeMs = start;
synchronized (this.blockSignal) {
boolean blocked = false;
long startTime = 0;
@@ -744,8 +745,11 @@ class MemStoreFlusher implements FlushRequester {
LOG.warn("Interrupted while waiting");
interrupted = true;
}
- long took = EnvironmentEdgeManager.currentTime() - start;
- LOG.warn("Memstore is above high water mark and block " + took + "ms");
+ long nowMs = EnvironmentEdgeManager.currentTime();
+ if (nowMs >= nextLogTimeMs) {
+ LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+ nextLogTimeMs = nowMs + 1000;
+ }
flushType = isAboveHighWaterMark();
}
} finally {
[09/11] hbase git commit: Revert "HIVE-21575 : memstore above high
watermark message is logged too much"
Posted by el...@apache.org.
Revert "HIVE-21575 : memstore above high watermark message is logged too much"
This reverts commit 4640ff5959af4865966126a503a7cd15e26a7408.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a25d0c2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a25d0c2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a25d0c2
Branch: refs/heads/HBASE-20952
Commit: 9a25d0c249e595a1f8aef41cd677b44ff1c72d73
Parents: cb1966d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Dec 13 12:46:39 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 13 12:46:39 2018 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/regionserver/MemStoreFlusher.java | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a25d0c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 804a2f8..699c9b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,7 +703,6 @@ class MemStoreFlusher implements FlushRequester {
if (flushType != FlushType.NORMAL) {
TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
long start = EnvironmentEdgeManager.currentTime();
- long nextLogTimeMs = start;
synchronized (this.blockSignal) {
boolean blocked = false;
long startTime = 0;
@@ -745,11 +744,8 @@ class MemStoreFlusher implements FlushRequester {
LOG.warn("Interrupted while waiting");
interrupted = true;
}
- long nowMs = EnvironmentEdgeManager.currentTime();
- if (nowMs >= nextLogTimeMs) {
- LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
- nextLogTimeMs = nowMs + 1000;
- }
+ long took = EnvironmentEdgeManager.currentTime() - start;
+ LOG.warn("Memstore is above high water mark and block " + took + "ms");
flushType = isAboveHighWaterMark();
}
} finally {