You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 21:58:10 UTC

[01/11] hbase git commit: HBASE-21560 Return a new TableDescriptor for MasterObserver#preModifyTable to allow coprocessor modify the TableDescriptor [Forced Update!]

Repository: hbase
Updated Branches:
  refs/heads/HBASE-20952 fb59426b7 -> ebfc04d85 (forced update)


HBASE-21560 Return a new TableDescriptor for MasterObserver#preModifyTable to allow coprocessor modify the TableDescriptor


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79d90c87
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79d90c87
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79d90c87

Branch: refs/heads/HBASE-20952
Commit: 79d90c87b5bc6d4aa50e6edc52a3f20da708ee29
Parents: 8d7061a
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Dec 7 16:51:19 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sat Dec 8 09:28:14 2018 +0800

----------------------------------------------------------------------
 .../hbase/coprocessor/MasterObserver.java       |   6 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  11 +-
 .../hbase/master/MasterCoprocessorHost.java     |  22 ++--
 .../hbase/security/access/AccessController.java |   9 +-
 .../CoprocessorWhitelistMasterObserver.java     |   5 +-
 .../visibility/VisibilityController.java        |  15 ++-
 .../hbase/coprocessor/TestMasterObserver.java   |   3 +-
 .../TestMasterObserverToModifyTableSchema.java  | 128 +++++++++++++++++++
 8 files changed, 169 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index a0863e4..1a8db79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -240,11 +240,13 @@ public interface MasterObserver {
    * @param currentDescriptor current TableDescriptor of the table
    * @param newDescriptor after modify operation, table will have this descriptor
    */
-  default void preModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+  default TableDescriptor preModifyTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
       final TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
-    throws IOException {
+      throws IOException {
     preModifyTable(ctx, tableName, newDescriptor);
+    return newDescriptor;
   }
+
   /**
    * Called after the modifyTable operation has been requested.  Called as part
    * of modify table RPC call.

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e96dc36..a16e09d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -2631,13 +2631,12 @@ public class HMaster extends HRegionServer implements MasterServices {
         .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
           @Override
           protected void run() throws IOException {
-            TableDescriptor newDescriptor = newDescriptorGetter.get();
-            sanityCheckTableDescriptor(newDescriptor);
             TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
-            getMaster().getMasterCoprocessorHost().preModifyTable(tableName, oldDescriptor,
-              newDescriptor);
-
-            LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
+            TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
+                .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
+            sanityCheckTableDescriptor(newDescriptor);
+            LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
+                oldDescriptor, newDescriptor);
 
             // Execute the operation synchronously - wait for the operation completes before
             // continuing.

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 51e30c4..e7b166c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -446,14 +446,20 @@ public class MasterCoprocessorHost
     });
   }
 
-  public void preModifyTable(final TableName tableName, final TableDescriptor currentDescriptor,
-    final TableDescriptor newDescriptor) throws IOException {
-    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
-      @Override
-      public void call(MasterObserver observer) throws IOException {
-        observer.preModifyTable(this, tableName, currentDescriptor, newDescriptor);
-      }
-    });
+  public TableDescriptor preModifyTable(final TableName tableName,
+      final TableDescriptor currentDescriptor, final TableDescriptor newDescriptor)
+      throws IOException {
+    if (coprocEnvironments.isEmpty()) {
+      return newDescriptor;
+    }
+    return execOperationWithResult(
+        new ObserverOperationWithResult<MasterObserver, TableDescriptor>(masterObserverGetter,
+            newDescriptor) {
+          @Override
+          protected TableDescriptor call(MasterObserver observer) throws IOException {
+            return observer.preModifyTable(this, tableName, currentDescriptor, getResult());
+          }
+        });
   }
 
   public void postModifyTable(final TableName tableName, final TableDescriptor oldDescriptor,

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 835fc0d..82ec12d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -970,11 +970,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
   }
 
   @Override
-  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
-      TableDescriptor currentDesc, TableDescriptor newDesc) throws IOException {
+  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
+      TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
+      throws IOException {
     // TODO: potentially check if this is a add/modify/delete column operation
-    requirePermission(c, "modifyTable",
-        tableName, null, null, Action.ADMIN, Action.CREATE);
+    requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
+    return newDesc;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
index 719fe33..1e83e96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
@@ -54,10 +54,11 @@ public class CoprocessorWhitelistMasterObserver implements MasterCoprocessor, Ma
   }
 
   @Override
-  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
       TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
-    throws IOException {
+      throws IOException {
     verifyCoprocessors(ctx, newDesc);
+    return newDesc;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 1f54afc..c4f3b95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -226,14 +226,15 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
   }
 
   @Override
-  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName,
-    TableDescriptor currentDescriptor, TableDescriptor newDescriptor) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
-    if (LABELS_TABLE_NAME.equals(tableName)) {
-      throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
+  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+      TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
+      throws IOException {
+    if (authorizationEnabled) {
+      if (LABELS_TABLE_NAME.equals(tableName)) {
+        throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
+      }
     }
+    return newDescriptor;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index d8a5b4c..58f4b9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -373,10 +373,11 @@ public class TestMasterObserver {
     }
 
     @Override
-    public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
+    public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
         TableName tableName, final TableDescriptor currentDescriptor,
       final TableDescriptor newDescriptor) throws IOException {
       preModifyTableCalled = true;
+      return newDescriptor;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d90c87/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
new file mode 100644
index 0000000..d23a4a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserverToModifyTableSchema.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestMasterObserverToModifyTableSchema {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMasterObserverToModifyTableSchema.class);
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static TableName TABLENAME = TableName.valueOf("TestTable");
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        OnlyOneVersionAllowedMasterObserver.class.getName());
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMasterObserverToModifyTableSchema() throws IOException {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
+    for (int i = 1; i <= 3; i++) {
+      builder.setColumnFamily(
+          ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf" + i)).setMaxVersions(i)
+              .build());
+    }
+    try (Admin admin = UTIL.getAdmin()) {
+      admin.createTable(builder.build());
+      assertOneVersion(admin.getDescriptor(TABLENAME));
+
+      builder.modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf1"))
+          .setMaxVersions(Integer.MAX_VALUE).build());
+      admin.modifyTable(builder.build());
+      assertOneVersion(admin.getDescriptor(TABLENAME));
+    }
+  }
+
+  private void assertOneVersion(TableDescriptor td) {
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      assertEquals(1, cfd.getMaxVersions());
+    }
+  }
+
+  public static class OnlyOneVersionAllowedMasterObserver
+      implements MasterCoprocessor, MasterObserver {
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public TableDescriptor preCreateTableRegionsInfos(
+        ObserverContext<MasterCoprocessorEnvironment> ctx, TableDescriptor desc)
+        throws IOException {
+      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
+      for (ColumnFamilyDescriptor cfd : desc.getColumnFamilies()) {
+        builder.modifyColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(cfd).setMaxVersions(1).build());
+      }
+      return builder.build();
+    }
+
+    @Override
+    public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
+        TableName tableName, final TableDescriptor currentDescriptor,
+        final TableDescriptor newDescriptor) throws IOException {
+      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(newDescriptor);
+      for (ColumnFamilyDescriptor cfd : newDescriptor.getColumnFamilies()) {
+        builder.modifyColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(cfd).setMaxVersions(1).build());
+      }
+      return builder.build();
+    }
+  }
+}


[07/11] hbase git commit: HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time

Posted by el...@apache.org.
HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f32d2618
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f32d2618
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f32d2618

Branch: refs/heads/HBASE-20952
Commit: f32d2618430f70e1b0db92785294b2c7892cc02b
Parents: 4640ff5
Author: huzheng <op...@gmail.com>
Authored: Tue Dec 11 20:27:56 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 13 10:35:20 2018 +0800

----------------------------------------------------------------------
 .../hbase/master/snapshot/SnapshotManager.java  | 48 ++++++++++++++------
 .../master/cleaner/TestSnapshotFromMaster.java  | 27 ++++++++++-
 .../master/snapshot/TestSnapshotManager.java    | 36 +++++++++++++--
 3 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 2b963b2..05db4ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -28,7 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -91,6 +95,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringP
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * This class manages the procedure of taking and restoring snapshots. There is only one
@@ -120,7 +126,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * At this point, if the user asks for the snapshot/restore status, the result will be
    * snapshot done if exists or failed if it doesn't exists.
    */
-  private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
+  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
+      "hbase.snapshot.sentinels.cleanup.timeoutMillis";
+  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
 
   /** Enable or disable snapshot support */
   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
@@ -151,7 +159,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
   // The map is always accessed and modified under the object lock using synchronized.
   // snapshotTable() will insert an Handler in the table.
   // isSnapshotDone() will remove the handler requested if the operation is finished.
-  private Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
+  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
+  private final ScheduledExecutorService scheduleThreadPool =
+      Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+          .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
+  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
 
   // Restore map, with table name as key, procedure ID as value.
   // The map is always accessed and modified under the object lock using synchronized.
@@ -181,17 +193,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * @param coordinator procedure coordinator instance.  exposed for testing.
    * @param pool HBase ExecutorServcie instance, exposed for testing.
    */
-  public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
-      ProcedureCoordinator coordinator, ExecutorService pool)
+  @VisibleForTesting
+  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
+      ExecutorService pool, int sentinelCleanInterval)
       throws IOException, UnsupportedOperationException {
     this.master = master;
 
     this.rootDir = master.getMasterFileSystem().getRootDir();
-    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+    Configuration conf = master.getConfiguration();
+    checkSnapshotSupport(conf, master.getMasterFileSystem());
 
     this.coordinator = coordinator;
     this.executorService = pool;
     resetTempDir();
+    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
+      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
   }
 
   /**
@@ -274,7 +290,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    *
    * @throws IOException if we can't reach the filesystem
    */
-  void resetTempDir() throws IOException {
+  private void resetTempDir() throws IOException {
     // cleanup any existing snapshots.
     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
         master.getConfiguration());
@@ -290,7 +306,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
    * @throws IOException For filesystem IOExceptions
    */
-  public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
+  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
     // check to see if it is completed
     if (!isSnapshotCompleted(snapshot)) {
       throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
@@ -934,7 +950,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
       this.restoreTableToProcIdMap.remove(tableName);
       return false;
     }
-
   }
 
   /**
@@ -989,14 +1004,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
    */
   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
     long currentTime = EnvironmentEdgeManager.currentTime();
-    Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
-        sentinels.entrySet().iterator();
+    long sentinelsCleanupTimeoutMillis =
+        master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
+          SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
+    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
       SnapshotSentinel sentinel = entry.getValue();
-      if (sentinel.isFinished() &&
-          (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
-      {
+      if (sentinel.isFinished()
+          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
         it.remove();
       }
     }
@@ -1031,7 +1047,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
       snapshotHandler.cancel(why);
     }
-
+    if (snapshotHandlerChoreCleanerTask != null) {
+      snapshotHandlerChoreCleanerTask.cancel(true);
+    }
     try {
       if (coordinator != null) {
         coordinator.close();
@@ -1166,6 +1184,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
     this.executorService = master.getExecutorService();
     resetTempDir();
+    snapshotHandlerChoreCleanerTask =
+        scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 9d76ede..cc2ee06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -25,6 +26,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,8 +35,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.SnapshotType;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -129,11 +136,11 @@ public class TestSnapshotFromMaster {
     conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, "");
     // Enable snapshot
     conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L);
     conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
       ConstantSizeRegionSplitPolicy.class.getName());
     conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
-
   }
 
   @Before
@@ -419,4 +426,22 @@ public class TestSnapshotFromMaster {
     builder.commit();
     return builder.getSnapshotDescription();
   }
+
+  @Test
+  public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
+    // Write some data
+    Table table = UTIL.getConnection().getTable(TABLE_NAME);
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
+      table.put(put);
+    }
+    String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
+    UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription(
+        snapshotName, TABLE_NAME, SnapshotType.FLUSH));
+    Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L,
+      () -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1);
+    assertTrue(master.getSnapshotManager().isTakingAnySnapshot());
+    Thread.sleep(11 * 1000L);
+    assertFalse(master.getSnapshotManager().isTakingAnySnapshot());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
index 3a6a61f..ff903c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.MetricsMaster;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
 import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
@@ -62,7 +61,6 @@ public class TestSnapshotManager {
   public TestName name = new TestName();
 
   MasterServices services = Mockito.mock(MasterServices.class);
-  MetricsMaster metrics = Mockito.mock(MetricsMaster.class);
   ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
   ExecutorService pool = Mockito.mock(ExecutorService.class);
   MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
@@ -79,14 +77,44 @@ public class TestSnapshotManager {
     return getNewManager(UTIL.getConfiguration());
   }
 
-  private SnapshotManager getNewManager(final Configuration conf)
+  private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException {
+    return getNewManager(conf, 1);
+  }
+
+  private SnapshotManager getNewManager(Configuration conf, int intervalSeconds)
       throws IOException, KeeperException {
     Mockito.reset(services);
     Mockito.when(services.getConfiguration()).thenReturn(conf);
     Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
     Mockito.when(mfs.getFileSystem()).thenReturn(fs);
     Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
-    return new SnapshotManager(services, metrics, coordinator, pool);
+    return new SnapshotManager(services, coordinator, pool, intervalSeconds);
+  }
+
+  @Test
+  public void testCleanFinishedHandler() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    Configuration conf = UTIL.getConfiguration();
+    try {
+      conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L);
+      SnapshotManager manager = getNewManager(conf, 1);
+      TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
+      assertFalse("Manager is in process when there is no current handler",
+        manager.isTakingSnapshot(tableName));
+      manager.setSnapshotHandlerForTesting(tableName, handler);
+      Mockito.when(handler.isFinished()).thenReturn(false);
+      assertTrue(manager.isTakingAnySnapshot());
+      assertTrue("Manager isn't in process when handler is running",
+        manager.isTakingSnapshot(tableName));
+      Mockito.when(handler.isFinished()).thenReturn(true);
+      assertFalse("Manager is process when handler isn't running",
+        manager.isTakingSnapshot(tableName));
+      assertTrue(manager.isTakingAnySnapshot());
+      Thread.sleep(6 * 1000);
+      assertFalse(manager.isTakingAnySnapshot());
+    } finally {
+      conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS);
+    }
   }
 
   @Test


[04/11] hbase git commit: HBASE-21453 Convert ReadOnlyZKClient to DEBUG instead of INFO

Posted by el...@apache.org.
HBASE-21453 Convert ReadOnlyZKClient to DEBUG instead of INFO


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f88224ee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f88224ee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f88224ee

Branch: refs/heads/HBASE-20952
Commit: f88224ee34ba2c23f794ec1219ffd93783b20e51
Parents: b09b87d
Author: Sakthi <ja...@cloudera.com>
Authored: Thu Nov 29 18:52:50 2018 -0800
Committer: Peter Somogyi <ps...@apache.org>
Committed: Tue Dec 11 08:18:02 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f88224ee/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index fc2d5f0..09f8984 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -136,7 +136,7 @@ public final class ReadOnlyZKClient implements Closeable {
     this.retryIntervalMs =
         conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
     this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
-    LOG.info(
+    LOG.debug(
       "Connect {} to {} with session timeout={}ms, retries {}, " +
         "retry interval {}ms, keepAlive={}ms",
       getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
@@ -347,7 +347,7 @@ public final class ReadOnlyZKClient implements Closeable {
   @Override
   public void close() {
     if (closed.compareAndSet(false, true)) {
-      LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
+      LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
       tasks.add(CLOSE);
     }
   }


[05/11] hbase git commit: HBASE-21568 Use CacheConfig.DISABLED where we don't expect to have blockcache running

Posted by el...@apache.org.
HBASE-21568 Use CacheConfig.DISABLED where we don't expect to have blockcache running

This includes removing the "old way" of disabling blockcache in favor of the
new API.

Signed-off-by: Guanghao Zhang <zg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67d6d508
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67d6d508
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67d6d508

Branch: refs/heads/HBASE-20952
Commit: 67d6d5084cf8fc094cda4bd3f091d8a0a9cb1d3e
Parents: f88224e
Author: Josh Elser <el...@apache.org>
Authored: Fri Dec 7 17:18:49 2018 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 11 10:02:18 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java  | 6 ++----
 .../src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java  | 4 +---
 .../org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java   | 2 +-
 .../org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java    | 6 +++---
 .../java/org/apache/hadoop/hbase/util/CompressionTest.java     | 2 +-
 .../src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java  | 5 ++---
 .../apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java  | 2 +-
 7 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index c911e8c..274a506 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -414,8 +414,6 @@ public class HFileOutputFormat2
         DataBlockEncoding encoding = overriddenEncoding;
         encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
         encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
-        Configuration tempConf = new Configuration(conf);
-        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
         HFileContextBuilder contextBuilder = new HFileContextBuilder()
                                     .withCompression(compression)
                                     .withChecksumType(HStore.getChecksumType(conf))
@@ -430,12 +428,12 @@ public class HFileOutputFormat2
         HFileContext hFileContext = contextBuilder.build();
         if (null == favoredNodes) {
           wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
                   .withOutputDir(familydir).withBloomType(bloomType)
                   .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
         } else {
           wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
                   .withOutputDir(familydir).withBloomType(bloomType)
                   .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
                   .withFavoredNodes(favoredNodes).build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 5bcaa17..78ebedc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -356,9 +356,7 @@ public class HFile {
    */
   public static final WriterFactory getWriterFactoryNoCache(Configuration
        conf) {
-    Configuration tempConf = new Configuration(conf);
-    tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-    return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
+    return HFile.getWriterFactory(conf, CacheConfig.DISABLED);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 82e881b..5a6f6c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -309,7 +309,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
       return -2;
     }
 
-    HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf());
+    HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, getConf());
 
     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index e027ac6..3320b1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -710,7 +710,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     Path hfilePath = item.getFilePath();
     Optional<byte[]> first, last;
     try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
-      new CacheConfig(getConf()), true, getConf())) {
+      CacheConfig.DISABLED, true, getConf())) {
       hfr.loadFileInfo();
       first = hfr.getFirstRowKey();
       last = hfr.getLastRowKey();
@@ -847,7 +847,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           throws IOException {
         Path hfile = hfileStatus.getPath();
         try (HFile.Reader reader =
-            HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
+            HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
           if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
             builder.setCompressionType(reader.getFileContext().getCompression());
             LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
@@ -1083,7 +1083,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
       Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
     FileSystem fs = inFile.getFileSystem(conf);
-    CacheConfig cacheConf = new CacheConfig(conf);
+    CacheConfig cacheConf = CacheConfig.DISABLED;
     HalfStoreFileReader halfReader = null;
     StoreFileWriter halfWriter = null;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index b6af8a5..dcdd12e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -134,7 +134,7 @@ public class CompressionTest {
     writer.appendFileInfo(Bytes.toBytes("compressioninfokey"), Bytes.toBytes("compressioninfoval"));
     writer.close();
     Cell cc = null;
-    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
+    HFile.Reader reader = HFile.createReader(fs, path, CacheConfig.DISABLED, true, conf);
     try {
       reader.loadFileInfo();
       HFileScanner scanner = reader.getScanner(false, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 14706c5..8176942 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -922,7 +922,7 @@ public class HBaseFsck extends Configured implements Closeable {
             // For all the stores in this column family.
             for (FileStatus storeFile : storeFiles) {
               HFile.Reader reader = HFile.createReader(fs, storeFile.getPath(),
-                new CacheConfig(getConf()), true, getConf());
+                CacheConfig.DISABLED, true, getConf());
               if ((reader.getFirstKey() != null)
                   && ((storeFirstKey == null) || (comparator.compare(storeFirstKey,
                       ((KeyValue.KeyOnlyKeyValue) reader.getFirstKey().get()).getKey()) > 0))) {
@@ -1025,8 +1025,7 @@ public class HBaseFsck extends Configured implements Closeable {
         byte[] start, end;
         HFile.Reader hf = null;
         try {
-          CacheConfig cacheConf = new CacheConfig(getConf());
-          hf = HFile.createReader(fs, hfile.getPath(), cacheConf, true, getConf());
+          hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED, true, getConf());
           hf.loadFileInfo();
           Optional<Cell> startKv = hf.getFirstKey();
           start = CellUtil.cloneRow(startKv.get());

http://git-wip-us.apache.org/repos/asf/hbase/blob/67d6d508/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
index e937fa5..41f3cde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
@@ -82,7 +82,7 @@ public class HFileCorruptionChecker {
       boolean quarantine) throws IOException {
     this.conf = conf;
     this.fs = FileSystem.get(conf);
-    this.cacheConf = new CacheConfig(conf);
+    this.cacheConf = CacheConfig.DISABLED;
     this.executor = executor;
     this.inQuarantineMode = quarantine;
   }


[11/11] hbase git commit: HBASE-20952 run the extended tests weekly until branch activity picks up.

Posted by el...@apache.org.
HBASE-20952 run the extended tests weekly until branch activity picks up.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ebfc04d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ebfc04d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ebfc04d8

Branch: refs/heads/HBASE-20952
Commit: ebfc04d85e44d5af75afa68789ef1ae5e3b0ed35
Parents: 3ff274e
Author: Sean Busbey <bu...@apache.org>
Authored: Fri Nov 16 07:51:08 2018 -0600
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 13 16:57:55 2018 -0500

----------------------------------------------------------------------
 dev-support/Jenkinsfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ebfc04d8/dev-support/Jenkinsfile
----------------------------------------------------------------------
diff --git a/dev-support/Jenkinsfile b/dev-support/Jenkinsfile
index b333afb..bea425a 100644
--- a/dev-support/Jenkinsfile
+++ b/dev-support/Jenkinsfile
@@ -21,7 +21,7 @@ pipeline {
     }
   }
   triggers {
-    cron('@daily')
+    cron('@weekly')
   }
   options {
     buildDiscarder(logRotator(numToKeepStr: '30'))


[02/11] hbase git commit: HBASE-21567 Allow overriding configs starting up the shell

Posted by el...@apache.org.
HBASE-21567 Allow overriding configs starting up the shell

Adds support for -D as option to 'hbase shell'


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da9508d4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da9508d4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da9508d4

Branch: refs/heads/HBASE-20952
Commit: da9508d4271ea12410e289692f10791b0e05266b
Parents: 79d90c8
Author: stack <st...@apache.org>
Authored: Thu Dec 6 23:05:21 2018 -0800
Committer: stack <st...@apache.org>
Committed: Sat Dec 8 15:08:19 2018 -0800

----------------------------------------------------------------------
 bin/hirb.rb                            | 40 ++++++++++++++++++++++++-----
 src/main/asciidoc/_chapters/shell.adoc | 16 ++++++++++++
 2 files changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da9508d4/bin/hirb.rb
----------------------------------------------------------------------
diff --git a/bin/hirb.rb b/bin/hirb.rb
index 790ecdc..e857db7 100644
--- a/bin/hirb.rb
+++ b/bin/hirb.rb
@@ -54,21 +54,47 @@ $LOAD_PATH.unshift Pathname.new(sources)
 cmdline_help = <<HERE # HERE document output as shell usage
 Usage: shell [OPTIONS] [SCRIPTFILE [ARGUMENTS]]
 
- -d | --debug                   Set DEBUG log levels.
- -h | --help                    This help.
- -n | --noninteractive          Do not run within an IRB session
-                                and exit with non-zero status on
-                                first error.
+ -d | --debug            Set DEBUG log levels.
+ -h | --help             This help.
+ -n | --noninteractive   Do not run within an IRB session and exit with non-zero
+                         status on first error.
+ -Dkey=value             Pass hbase-*.xml Configuration overrides. For example, to
+                         use an alternate zookeeper ensemble, pass:
+                           -Dhbase.zookeeper.quorum=zookeeper.example.org
+                         For faster fail, pass the below and vary the values:
+                           -Dhbase.client.retries.number=7
+                           -Dhbase.ipc.client.connect.max.retries=3
 HERE
+
+# Takes configuration and an arg that is expected to be key=value format.
+# If c is empty, creates one and returns it
+def add_to_configuration(c, arg)
+  kv = arg.split('=')
+  kv.length == 2 || (raise "Expected parameter #{kv} in key=value format")
+  c = org.apache.hadoop.hbase.HBaseConfiguration.create if c.nil?
+  c.set(kv[0], kv[1])
+  c
+end
+
 found = []
 script2run = nil
 log_level = org.apache.log4j.Level::ERROR
 @shell_debug = false
 interactive = true
-for arg in ARGV
+_configuration = nil
+D_ARG = '-D'
+while (arg = ARGV.shift)
   if arg == '-h' || arg == '--help'
     puts cmdline_help
     exit
+  elsif arg == D_ARG
+    argValue = ARGV.shift || (raise "#{D_ARG} takes a 'key=value' parameter")
+    _configuration = add_to_configuration(_configuration, argValue)
+    found.push(arg)
+    found.push(argValue)
+  elsif arg.start_with? D_ARG
+    _configuration = add_to_configuration(_configuration, arg[2..-1])
+    found.push(arg)
   elsif arg == '-d' || arg == '--debug'
     log_level = org.apache.log4j.Level::DEBUG
     $fullBackTrace = true
@@ -111,7 +137,7 @@ require 'shell'
 require 'shell/formatter'
 
 # Setup the HBase module.  Create a configuration.
-@hbase = Hbase::Hbase.new
+@hbase = _configuration.nil? ? Hbase::Hbase.new : Hbase::Hbase.new(_configuration)
 
 # Setup console
 @shell = Shell::Shell.new(@hbase, interactive)

http://git-wip-us.apache.org/repos/asf/hbase/blob/da9508d4/src/main/asciidoc/_chapters/shell.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/shell.adoc b/src/main/asciidoc/_chapters/shell.adoc
index 5612e1d..cdfa828 100644
--- a/src/main/asciidoc/_chapters/shell.adoc
+++ b/src/main/asciidoc/_chapters/shell.adoc
@@ -58,6 +58,7 @@ To run one of these files, do as follows:
 $ ./bin/hbase org.jruby.Main PATH_TO_SCRIPT
 ----
 
+
 == Running the Shell in Non-Interactive Mode
 
 A new non-interactive mode has been added to the HBase Shell (link:https://issues.apache.org/jira/browse/HBASE-11658[HBASE-11658)].
@@ -213,6 +214,21 @@ $ HBASE_SHELL_OPTS="-verbose:gc -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCD
   -XX:+PrintGCDetails -Xloggc:$HBASE_HOME/logs/gc-hbase.log" ./bin/hbase shell
 ----
 
+== Overriding configuration starting the HBase Shell
+
+As of hbase-2.0.5/hbase-2.1.3/hbase-2.2.0/hbase-1.4.10/hbase-1.5.0, you can
+pass or override hbase configuration as specified in `hbase-*.xml` by passing
+your key/values prefixed with `-D` on the command-line as follows:
+[source,bash]
+----
+$ ./bin/hbase shell -Dhbase.zookeeper.quorum=ZK0.remote.cluster.example.org,ZK1.remote.cluster.example.org,ZK2.remote.cluster.example.org -Draining=false
+...
+hbase(main):001:0> @shell.hbase.configuration.get("hbase.zookeeper.quorum")
+=> "ZK0.remote.cluster.example.org,ZK1.remote.cluster.example.org,ZK2.remote.cluster.example.org"
+hbase(main):002:0> @shell.hbase.configuration.get("raining")
+=> "false"
+----
+
 == Shell Tricks
 
 === Table variables


[06/11] hbase git commit: HIVE-21575 : memstore above high watermark message is logged too much

Posted by el...@apache.org.
HIVE-21575 : memstore above high watermark message is logged too much


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4640ff59
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4640ff59
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4640ff59

Branch: refs/heads/HBASE-20952
Commit: 4640ff5959af4865966126a503a7cd15e26a7408
Parents: 67d6d50
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 12 11:02:25 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 12 11:02:25 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/regionserver/MemStoreFlusher.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4640ff59/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 699c9b6..804a2f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,6 +703,7 @@ class MemStoreFlusher implements FlushRequester {
     if (flushType != FlushType.NORMAL) {
       TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
       long start = EnvironmentEdgeManager.currentTime();
+      long nextLogTimeMs = start;
       synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
@@ -744,8 +745,11 @@ class MemStoreFlusher implements FlushRequester {
               LOG.warn("Interrupted while waiting");
               interrupted = true;
             }
-            long took = EnvironmentEdgeManager.currentTime() - start;
-            LOG.warn("Memstore is above high water mark and block " + took + "ms");
+            long nowMs = EnvironmentEdgeManager.currentTime();
+            if (nowMs >= nextLogTimeMs) {
+              LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+              nextLogTimeMs = nowMs + 1000;
+            }
             flushType = isAboveHighWaterMark();
           }
         } finally {


[03/11] hbase git commit: HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator

Posted by el...@apache.org.
HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b09b87d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b09b87d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b09b87d1

Branch: refs/heads/HBASE-20952
Commit: b09b87d143730db00ec56114a752d3a74f8982c4
Parents: da9508d
Author: zhangduo <zh...@apache.org>
Authored: Tue Dec 11 08:39:43 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Dec 11 14:51:26 2018 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncBufferedMutator.java      |  16 +-
 .../client/AsyncBufferedMutatorBuilder.java     |  19 +++
 .../client/AsyncBufferedMutatorBuilderImpl.java |  19 ++-
 .../hbase/client/AsyncBufferedMutatorImpl.java  |  67 +++++---
 .../client/AsyncConnectionConfiguration.java    |  37 +++--
 .../hbase/client/AsyncConnectionImpl.java       |  11 +-
 .../hbase/client/TestAsyncBufferMutator.java    | 161 ++++++++++++++++++-
 7 files changed, 277 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6fe4b9a..7b21eb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -18,13 +18,16 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
 /**
  * Used to communicate with a single HBase table in batches. Obtain an instance from a
  * {@link AsyncConnection} and call {@link #close()} afterwards.
@@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
    * part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
    * @param mutation The data to send.
    */
-  CompletableFuture<Void> mutate(Mutation mutation);
+  default CompletableFuture<Void> mutate(Mutation mutation) {
+    return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
+  }
 
   /**
    * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
@@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
    * @return The size of the write buffer in bytes.
    */
   long getWriteBufferSize();
+
+  /**
+   * Returns the periodical flush interval, 0 means disabled.
+   */
+  default long getPeriodicalFlushTimeout(TimeUnit unit) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index 45959bb..c617c8e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -46,6 +46,25 @@ public interface AsyncBufferedMutatorBuilder {
   AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
 
   /**
+   * Set the periodical flush interval. If the data in the buffer has not been flush for a long
+   * time, i.e, reach this timeout limit, we will flush it automatically.
+   * <p/>
+   * Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
+   * 'flush immediately'. If you want to flush immediately then you should not use this class, as it
+   * is designed to be 'buffered'.
+   */
+  default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
+   * Disable the periodical flush, i.e, set the timeout to 0.
+   */
+  default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
+    return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
+  }
+
+  /**
    * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
    * <p>
    * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 227d02b..eb8af17 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 /**
  * The implementation of {@link AsyncBufferedMutatorBuilder}.
@@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
 
+  private final HashedWheelTimer periodicalFlushTimer;
+
   private final AsyncTableBuilder<?> tableBuilder;
 
   private long writeBufferSize;
 
+  private long periodicFlushTimeoutNs;
+
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
-      AsyncTableBuilder<?> tableBuilder) {
+      AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
+    this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
+    this.periodicalFlushTimer = periodicalFlushTimer;
   }
 
   @Override
@@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
   }
 
   @Override
-  public AsyncBufferedMutator build() {
-    return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
+  public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+    this.periodicFlushTimeoutNs = unit.toNanos(timeout);
+    return this;
   }
 
+  @Override
+  public AsyncBufferedMutator build() {
+    return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
+      periodicFlushTimeoutNs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 5a92ace..318c6c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
 /**
  * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
  */
 @InterfaceAudience.Private
 class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
+  private final HashedWheelTimer periodicalFlushTimer;
+
   private final AsyncTable<?> table;
 
   private final long writeBufferSize;
 
+  private final long periodicFlushTimeoutNs;
+
   private List<Mutation> mutations = new ArrayList<>();
 
   private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
   private boolean closed;
 
-  AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
+  @VisibleForTesting
+  Timeout periodicFlushTask;
+
+  AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+      long writeBufferSize, long periodicFlushTimeoutNs) {
+    this.periodicalFlushTimer = periodicalFlushTimer;
     this.table = table;
     this.writeBufferSize = writeBufferSize;
+    this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
   }
 
   @Override
@@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
     return table.getConfiguration();
   }
 
-  private void internalFlush() {
+  // will be overridden in test
+  @VisibleForTesting
+  protected void internalFlush() {
+    if (periodicFlushTask != null) {
+      periodicFlushTask.cancel();
+      periodicFlushTask = null;
+    }
     List<Mutation> toSend = this.mutations;
     if (toSend.isEmpty()) {
       return;
@@ -86,29 +107,10 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
   }
 
   @Override
-  public CompletableFuture<Void> mutate(Mutation mutation) {
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    long heapSize = mutation.heapSize();
-    synchronized (this) {
-      if (closed) {
-        future.completeExceptionally(new IOException("Already closed"));
-        return future;
-      }
-      mutations.add(mutation);
-      futures.add(future);
-      bufferedSize += heapSize;
-      if (bufferedSize >= writeBufferSize) {
-        internalFlush();
-      }
-    }
-    return future;
-  }
-
-  @Override
   public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
     List<CompletableFuture<Void>> futures =
-        Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
-            .collect(Collectors.toList());
+      Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
+        .collect(Collectors.toList());
     long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
     synchronized (this) {
       if (closed) {
@@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
         futures.forEach(f -> f.completeExceptionally(ioe));
         return futures;
       }
+      if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
+        periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
+          synchronized (AsyncBufferedMutatorImpl.this) {
+            // confirm that we are still valid, if there is already an internalFlush call before us,
+            // then we should not execute any more. And in internalFlush we will set periodicFlush
+            // to null, and since we may schedule a new one, so here we check whether the references
+            // are equal.
+            if (timeout == periodicFlushTask) {
+              periodicFlushTask = null;
+              internalFlush();
+            }
+          }
+        }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+      }
       this.mutations.addAll(mutations);
       this.futures.addAll(futures);
       bufferedSize += heapSize;
@@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
   public long getWriteBufferSize() {
     return writeBufferSize;
   }
+
+  @Override
+  public long getPeriodicalFlushTimeout(TimeUnit unit) {
+    return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index bd2add8..915e9dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
 
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
 
   private final long writeBufferSize;
 
+  private final long writeBufferPeriodicFlushTimeoutNs;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
     this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.rpcTimeoutNs = TimeUnit.MILLISECONDS
-        .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+    this.rpcTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
     this.readRpcTimeoutNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
     this.writeRpcTimeoutNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
     this.pauseNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
     this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.startLogErrorsCnt =
-        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
     this.scanTimeoutNs = TimeUnit.MILLISECONDS
-        .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-          HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+      .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+        HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
     this.scannerCaching =
-        conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
-    this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
+      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    this.metaScannerCaching =
+      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
     this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
-    this.writeBufferSize =  conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+    this.writeBufferPeriodicFlushTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
   }
 
   long getMetaOperationTimeoutNs() {
@@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
     return scannerCaching;
   }
 
-  int getMetaScannerCaching(){
+  int getMetaScannerCaching() {
     return metaScannerCaching;
   }
 
@@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
   long getWriteBufferSize() {
     return writeBufferSize;
   }
+
+  long getWriteBufferPeriodicFlushTimeoutNs() {
+    return writeBufferPeriodicFlushTimeoutNs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index a05764e..078395b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @VisibleForTesting
   static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
-      Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
+    Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
 
   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
 
@@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
         String msg = "ZooKeeper available but no active master location found";
         LOG.info(msg);
         this.masterStubMakeFuture.getAndSet(null)
-            .completeExceptionally(new MasterNotRunningException(msg));
+          .completeExceptionally(new MasterNotRunningException(msg));
         return;
       }
       try {
@@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
           });
       } catch (IOException e) {
         this.masterStubMakeFuture.getAndSet(null)
-            .completeExceptionally(new IOException("Failed to create async master stub", e));
+          .completeExceptionally(new IOException("Failed to create async master stub", e));
       }
     });
   }
@@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
-    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
+    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
   }
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
       ExecutorService pool) {
-    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
+    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
+      RETRY_TIMER);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 9fe4ca7..6eed326 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -45,12 +48,15 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncBufferMutator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
+    HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
   private void test(TableName tableName) throws InterruptedException {
     List<CompletableFuture<Void>> futures = new ArrayList<>();
     try (AsyncBufferedMutator mutator =
-        CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
+      CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
       List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
-          .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
-          .collect(Collectors.toList()));
+        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+        .collect(Collectors.toList()));
       // exceeded the write buffer size, a flush will be called directly
       fs.forEach(f -> f.join());
       IntStream.range(COUNT / 2, COUNT).forEach(i -> {
@@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
     futures.forEach(f -> f.join());
     AsyncTable<?> table = CONN.getTable(tableName);
     IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
-        .forEach(r -> {
-          assertArrayEquals(VALUE, r.getValue(CF, CQ));
-        });
+      .forEach(r -> {
+        assertArrayEquals(VALUE, r.getValue(CF, CQ));
+      });
   }
 
   @Test
@@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
       }
     }
   }
+
+  @Test
+  public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
+    try (AsyncBufferedMutator mutator =
+      CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
+      Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+      CompletableFuture<?> future = mutator.mutate(put);
+      Thread.sleep(2000);
+      // assert that we have not flushed it out
+      assertFalse(future.isDone());
+      mutator.flush();
+      future.get();
+    }
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  @Test
+  public void testPeriodicFlush() throws InterruptedException, ExecutionException {
+    AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+      .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    CompletableFuture<?> future = mutator.mutate(put);
+    future.get();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  // a bit deep into the implementation
+  @Test
+  public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
+      .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
+      .setWriteBufferSize(10 * put.heapSize()).build()) {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      futures.add(mutator.mutate(put));
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      for (int i = 1;; i++) {
+        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
+        if (mutator.periodicFlushTask == null) {
+          break;
+        }
+      }
+      assertTrue(task.isCancelled());
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      for (int i = 0; i < futures.size(); i++) {
+        assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
+      }
+    }
+  }
+
+  @Test
+  public void testCancelPeriodicFlushByManuallyFlush()
+      throws InterruptedException, ExecutionException {
+    try (AsyncBufferedMutatorImpl mutator =
+      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+      CompletableFuture<?> future =
+        mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      mutator.flush();
+      assertTrue(task.isCancelled());
+      future.get();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+    }
+  }
+
+  @Test
+  public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
+    CompletableFuture<?> future;
+    Timeout task;
+    try (AsyncBufferedMutatorImpl mutator =
+      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+      future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+      task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+    }
+    assertTrue(task.isCancelled());
+    future.get();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
+
+    private int flushCount;
+
+    AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+        long writeBufferSize, long periodicFlushTimeoutNs) {
+      super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
+    }
+
+    @Override
+    protected void internalFlush() {
+      flushCount++;
+      super.internalFlush();
+    }
+  }
+
+  @Test
+  public void testRaceBetweenNormalFlushAndPeriodicFlush()
+      throws InterruptedException, ExecutionException {
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    try (AsyncBufferMutatorForTest mutator =
+      new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
+        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+      CompletableFuture<?> future = mutator.mutate(put);
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      synchronized (mutator) {
+        // synchronized on mutator to prevent periodic flush to be executed
+        Thread.sleep(500);
+        // the timeout should be issued
+        assertTrue(task.isExpired());
+        // but no flush is issued as we hold the lock
+        assertEquals(0, mutator.flushCount);
+        assertFalse(future.isDone());
+        // manually flush, then release the lock
+        mutator.flush();
+      }
+      // this is a bit deep into the implementation in netty but anyway let's add a check here to
+      // confirm that an issued timeout can not be canceled by netty framework.
+      assertFalse(task.isCancelled());
+      // and the mutation is done
+      future.get();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+      // only the manual flush, the periodic flush should have been canceled by us
+      assertEquals(1, mutator.flushCount);
+    }
+  }
 }


[08/11] hbase git commit: HBASE-21590 Optimize trySkipToNextColumn in StoreScanner a bit.

Posted by el...@apache.org.
HBASE-21590 Optimize trySkipToNextColumn in StoreScanner a bit.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb1966dc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb1966dc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb1966dc

Branch: refs/heads/HBASE-20952
Commit: cb1966dc2d94fba10d9b6af3c5719e03f621df92
Parents: f32d261
Author: Lars Hofhansl <la...@apache.org>
Authored: Thu Dec 13 11:57:16 2018 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Thu Dec 13 11:57:16 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/regionserver/StoreScanner.java  | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cb1966dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 736c08a..e7a4528 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -802,12 +802,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   protected boolean trySkipToNextRow(Cell cell) throws IOException {
     Cell nextCell = null;
+    // used to guard against a changed next indexed key by doing a identity comparison
+    // when the identity changes we need to compare the bytes again
+    Cell previousIndexedKey = null;
     do {
       Cell nextIndexedKey = getNextIndexedKey();
       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
-          && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
+          && (nextIndexedKey == previousIndexedKey || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) {
         this.heap.next();
         ++kvsScanned;
+        previousIndexedKey = nextIndexedKey;
       } else {
         return false;
       }
@@ -823,12 +827,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   protected boolean trySkipToNextColumn(Cell cell) throws IOException {
     Cell nextCell = null;
+    // used to guard against a changed next indexed key by doing a identity comparison
+    // when the identity changes we need to compare the bytes again
+    Cell previousIndexedKey = null;
     do {
       Cell nextIndexedKey = getNextIndexedKey();
       if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
-          && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
+          && (nextIndexedKey == previousIndexedKey || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) {
         this.heap.next();
         ++kvsScanned;
+        previousIndexedKey = nextIndexedKey;
       } else {
         return false;
       }


[10/11] hbase git commit: HBASE-21575 : memstore above high watermark message is logged too much

Posted by el...@apache.org.
HBASE-21575 : memstore above high watermark message is logged too much


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ff274e2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ff274e2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ff274e2

Branch: refs/heads/HBASE-20952
Commit: 3ff274e22eb5710f4301fb0fce364e22a11288d7
Parents: 9a25d0c
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 12 11:02:25 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 13 12:47:11 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/regionserver/MemStoreFlusher.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff274e2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 699c9b6..804a2f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,6 +703,7 @@ class MemStoreFlusher implements FlushRequester {
     if (flushType != FlushType.NORMAL) {
       TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
       long start = EnvironmentEdgeManager.currentTime();
+      long nextLogTimeMs = start;
       synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
@@ -744,8 +745,11 @@ class MemStoreFlusher implements FlushRequester {
               LOG.warn("Interrupted while waiting");
               interrupted = true;
             }
-            long took = EnvironmentEdgeManager.currentTime() - start;
-            LOG.warn("Memstore is above high water mark and block " + took + "ms");
+            long nowMs = EnvironmentEdgeManager.currentTime();
+            if (nowMs >= nextLogTimeMs) {
+              LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+              nextLogTimeMs = nowMs + 1000;
+            }
             flushType = isAboveHighWaterMark();
           }
         } finally {


[09/11] hbase git commit: Revert "HIVE-21575 : memstore above high watermark message is logged too much"

Posted by el...@apache.org.
Revert "HIVE-21575 : memstore above high watermark message is logged too much"

This reverts commit 4640ff5959af4865966126a503a7cd15e26a7408.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a25d0c2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a25d0c2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a25d0c2

Branch: refs/heads/HBASE-20952
Commit: 9a25d0c249e595a1f8aef41cd677b44ff1c72d73
Parents: cb1966d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Dec 13 12:46:39 2018 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 13 12:46:39 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/regionserver/MemStoreFlusher.java    | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a25d0c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 804a2f8..699c9b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -703,7 +703,6 @@ class MemStoreFlusher implements FlushRequester {
     if (flushType != FlushType.NORMAL) {
       TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
       long start = EnvironmentEdgeManager.currentTime();
-      long nextLogTimeMs = start;
       synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
@@ -745,11 +744,8 @@ class MemStoreFlusher implements FlushRequester {
               LOG.warn("Interrupted while waiting");
               interrupted = true;
             }
-            long nowMs = EnvironmentEdgeManager.currentTime();
-            if (nowMs >= nextLogTimeMs) {
-              LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
-              nextLogTimeMs = nowMs + 1000;
-            }
+            long took = EnvironmentEdgeManager.currentTime() - start;
+            LOG.warn("Memstore is above high water mark and block " + took + "ms");
             flushType = isAboveHighWaterMark();
           }
         } finally {