You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/28 22:57:16 UTC

[1/8] hbase git commit: HBASE-13014 Java Tool For Region Moving (Abhishek Singh Chouhan)

Repository: hbase
Updated Branches:
  refs/heads/hbase-12439 d5d81d675 -> 4b018d2a3


HBASE-13014 Java Tool For Region Moving (Abhishek Singh Chouhan)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/939697b4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/939697b4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/939697b4

Branch: refs/heads/hbase-12439
Commit: 939697b415201348ff4523321e316dfaf2206630
Parents: d5d81d6
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Oct 27 22:04:51 2015 +0000
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Oct 27 22:46:15 2015 +0000

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/RegionMover.java   | 988 +++++++++++++++++++
 .../hadoop/hbase/util/TestRegionMover.java      | 175 ++++
 2 files changed, 1163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/939697b4/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java
new file mode 100644
index 0000000..d2a99bd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java
@@ -0,0 +1,988 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
+ * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
+ * acknowledges if regions are online after movement while noAck mode is best effort mode that
+ * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
+ * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
+ * anyways. This can also be used by constructiong an Object using the builder and then calling
+ * {@link #load()} or {@link #unload()} methods for the desired operations.
+ */
+@InterfaceAudience.Public
+public class RegionMover extends AbstractHBaseTool {
+  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
+  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
+  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
+  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
+  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
+  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
+  static final Log LOG = LogFactory.getLog(RegionMover.class);
+  private RegionMoverBuilder rmbuilder;
+  private boolean ack = true;
+  private int maxthreads = 1;
+  private int timeout;
+  private String loadUnload;
+  private String hostname;
+  private String filename;
+  private String excludeFile;
+  private int port;
+
+  private RegionMover(RegionMoverBuilder builder) {
+    this.hostname = builder.hostname;
+    this.filename = builder.filename;
+    this.excludeFile = builder.excludeFile;
+    this.maxthreads = builder.maxthreads;
+    this.ack = builder.ack;
+    this.port = builder.port;
+    this.timeout = builder.timeout;
+  }
+
+  private RegionMover() {
+  }
+
+  /**
+   * Builder for Region mover.Use the {@link #build()} method to create {@link #RegionMover(String)}
+   * object Has {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
+   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
+   */
+  public static class RegionMoverBuilder {
+    private boolean ack = true;
+    private int maxthreads = 1;
+    private int timeout = Integer.MAX_VALUE;
+    private String hostname;
+    private String filename;
+    private String excludeFile = null;
+    private String defaultDir = "/tmp";
+    private int port = HConstants.DEFAULT_REGIONSERVER_PORT;
+
+    /**
+     * Hostname to unload regions from or load regions to Valid format: <hostname> or
+     * <hostname:port>
+     * @param hostname
+     */
+    public RegionMoverBuilder(String hostname) {
+      String[] splitHostname = hostname.split(":");
+      this.hostname = splitHostname[0];
+      if (splitHostname.length == 2) {
+        this.port = Integer.parseInt(splitHostname[1]);
+      }
+      setDefaultfilename(this.hostname);
+    }
+
+    private void setDefaultfilename(String hostname) {
+      this.filename =
+          defaultDir + "/" + System.getProperty("user.name") + this.hostname + ":"
+              + Integer.toString(this.port);
+    }
+
+    /**
+     * Path of file where regions will be written to during unloading/read from during loading
+     * @param filename
+     * @return RegionMoverBuilder object
+     */
+    public RegionMoverBuilder filename(String filename) {
+      this.filename = filename;
+      return this;
+    }
+
+    /**
+     * Set the max number of threads that will be used to move regions
+     * @param threads
+     * @return RegionMoverBuilder object
+     */
+    public RegionMoverBuilder maxthreads(int threads) {
+      this.maxthreads = threads;
+      return this;
+    }
+
+    /**
+     * Path of file containing hostnames to be excluded during region movement Exclude file should
+     * have <host:port> per line.Port is mandatory here as we can have many RS running on a single
+     * host
+     * @param excludefile
+     * @return RegionMoverBuilder object
+     */
+    public RegionMoverBuilder excludeFile(String excludefile) {
+      this.excludeFile = excludefile;
+      return this;
+    }
+
+    /**
+     * Set ack/noAck mode.
+     * <p>
+     * In ack mode regions are acknowledged before and after moving and the move is retried
+     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
+     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
+     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
+     * <p>
+     * @param ack
+     * @return RegionMoverBuilder object
+     */
+    public RegionMoverBuilder ack(boolean ack) {
+      this.ack = ack;
+      return this;
+    }
+
+    /**
+     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
+     * movers also have a separate time which is hbase.move.wait.max * number of regions to
+     * load/unload
+     * @param timeout in seconds
+     * @return RegionMoverBuilder object
+     */
+    public RegionMoverBuilder timeout(int timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
+    /**
+     * This method builds the appropriate RegionMover object which can then be used to load/unload
+     * using load and unload methods
+     * @return RegionMover object
+     */
+    public RegionMover build() {
+      return new RegionMover(this);
+    }
+  }
+
+  /**
+   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
+   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
+   * @return true if loading succeeded, false otherwise
+   * @throws ExecutionException
+   * @throws InterruptedException if the loader thread was interrupted
+   * @throws TimeoutException
+   */
+  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
+    setConf();
+    ExecutorService loadPool = Executors.newFixedThreadPool(1);
+    Future<Boolean> loadTask = loadPool.submit(new Load(this));
+    loadPool.shutdown();
+    try {
+      if (!loadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
+        LOG.warn("Timed out before finishing the loading operation. Timeout:" + this.timeout
+            + "sec");
+        loadPool.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      loadPool.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    try {
+      return loadTask.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while loading Regions on " + this.hostname, e);
+      throw e;
+    } catch (ExecutionException e) {
+      LOG.error("Error while loading regions on RegionServer " + this.hostname, e);
+      throw e;
+    }
+  }
+
+  private class Load implements Callable<Boolean> {
+
+    private RegionMover rm;
+
+    public Load(RegionMover rm) {
+      this.rm = rm;
+    }
+
+    @Override
+    public Boolean call() throws IOException {
+      Connection conn = ConnectionFactory.createConnection(rm.conf);
+      try {
+        List<HRegionInfo> regionsToMove = readRegionsFromFile(rm.filename);
+        if (regionsToMove.isEmpty()) {
+          LOG.info("No regions to load.Exiting");
+          return true;
+        }
+        Admin admin = conn.getAdmin();
+        try {
+          loadRegions(admin, rm.hostname, rm.port, regionsToMove, rm.ack);
+        } finally {
+          admin.close();
+        }
+      } catch (Exception e) {
+        LOG.error("Error while loading regions to " + rm.hostname, e);
+        return false;
+      } finally {
+        conn.close();
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
+   * noAck mode we do not make sure that region is successfully online on the target region
+   * server,hence it is best effort.We do not unload regions to hostnames given in
+   * {@link #excludeFile}.
+   * @return true if unloading succeeded, false otherwise
+   * @throws InterruptedException if the unloader thread was interrupted
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
+    setConf();
+    deleteFile(this.filename);
+    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
+    Future<Boolean> unloadTask = unloadPool.submit(new Unload(this));
+    unloadPool.shutdown();
+    try {
+      if (!unloadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
+        LOG.warn("Timed out before finishing the unloading operation. Timeout:" + this.timeout
+            + "sec");
+        unloadPool.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      unloadPool.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    try {
+      return unloadTask.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while unloading Regions from " + this.hostname, e);
+      throw e;
+    } catch (ExecutionException e) {
+      LOG.error("Error while unloading regions from RegionServer " + this.hostname, e);
+      throw e;
+    }
+  }
+
+  private class Unload implements Callable<Boolean> {
+
+    List<HRegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<HRegionInfo>());
+    private RegionMover rm;
+
+    public Unload(RegionMover rm) {
+      this.rm = rm;
+    }
+
+    @Override
+    public Boolean call() throws IOException {
+      Connection conn = ConnectionFactory.createConnection(rm.conf);
+      try {
+        Admin admin = conn.getAdmin();
+        // Get Online RegionServers
+        ArrayList<String> regionServers = getServers(admin);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Online region servers:" + regionServers.toString());
+        }
+        // Remove the host Region server from target Region Servers list
+        String server = stripServer(regionServers, hostname, port);
+        // Remove RS present in the exclude file
+        stripExcludes(regionServers, rm.excludeFile);
+        stripMaster(regionServers, admin);
+        unloadRegions(admin, server, regionServers, rm.ack, movedRegions);
+      } catch (Exception e) {
+        LOG.error("Error while unloading regions ", e);
+        return false;
+      } finally {
+        try {
+          conn.close();
+        } catch (IOException e) {
+          // ignore
+        }
+        if (movedRegions != null) {
+          writeFile(rm.filename, movedRegions);
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Creates a new configuration if not already set and sets region mover specific overrides
+   */
+  private void setConf() {
+    if (conf == null) {
+      conf = HBaseConfiguration.create();
+      conf.setInt("hbase.client.prefetch.limit", 1);
+      conf.setInt("hbase.client.pause", 500);
+      conf.setInt("hbase.client.retries.number", 100);
+    }
+  }
+
+  private void loadRegions(Admin admin, String hostname, int port,
+      List<HRegionInfo> regionsToMove, boolean ack) throws Exception {
+    String server = null;
+    List<HRegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<HRegionInfo>());
+    int maxWaitInSeconds =
+        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
+    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
+    while ((EnvironmentEdgeManager.currentTime() < maxWait) && (server == null)) {
+      try {
+        ArrayList<String> regionServers = getServers(admin);
+        // Remove the host Region server from target Region Servers list
+        server = stripServer(regionServers, hostname, port);
+        if (server != null) {
+          break;
+        }
+      } catch (IOException e) {
+        LOG.warn("Could not get list of region servers", e);
+      } catch (Exception e) {
+        LOG.info("hostname=" + hostname + " is not up yet, waiting", e);
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while waiting for " + hostname + " to be up.Quitting now", e);
+        throw e;
+      }
+    }
+    if (server == null) {
+      LOG.error("Host:" + hostname + " is not up.Giving up.");
+      throw new Exception("Host to load regions not online");
+    }
+    LOG.info("Moving" + regionsToMove.size() + " regions to " + server + " using "
+        + this.maxthreads + " threads.Ack mode:" + this.ack);
+    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
+    List<Future<Boolean>> taskList = new ArrayList<Future<Boolean>>();
+    int counter = 0;
+    while (counter < regionsToMove.size()) {
+      HRegionInfo region = regionsToMove.get(counter);
+      String currentServer = getServerNameForRegion(admin, region);
+      if (currentServer == null) {
+        LOG.warn("Could not get server for Region:" + region.getEncodedName() + " moving on");
+        counter++;
+        continue;
+      } else if (server.equals(currentServer)) {
+        LOG.info("Region " + region.getRegionNameAsString() + "already on target server=" + server);
+        counter++;
+        continue;
+      }
+      if (ack) {
+        Future<Boolean> task =
+            moveRegionsPool.submit(new MoveWithAck(admin, region, currentServer, server,
+                movedRegions));
+        taskList.add(task);
+      } else {
+        Future<Boolean> task =
+            moveRegionsPool.submit(new MoveWithoutAck(admin, region, currentServer, server,
+                movedRegions));
+        taskList.add(task);
+      }
+      counter++;
+    }
+    moveRegionsPool.shutdown();
+    long timeoutInSeconds =
+        regionsToMove.size()
+            * admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
+    try {
+      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
+        moveRegionsPool.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      moveRegionsPool.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    for (Future<Boolean> future : taskList) {
+      try {
+        // if even after shutdownNow threads are stuck we wait for 5 secs max
+        if (!future.get(5, TimeUnit.SECONDS)) {
+          LOG.error("Was Not able to move region....Exiting Now");
+          throw new Exception("Could not move region Exception");
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
+        throw e;
+      } catch (ExecutionException e) {
+        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
+        throw e;
+      } catch (CancellationException e) {
+        LOG.error("Thread for moving region cancelled. Timeout for cancellation:"
+            + timeoutInSeconds + "secs", e);
+        throw e;
+      }
+    }
+  }
+
+  private void unloadRegions(Admin admin, String server, ArrayList<String> regionServers,
+      boolean ack, List<HRegionInfo> movedRegions) throws Exception {
+    List<HRegionInfo> regionsToMove = new ArrayList<HRegionInfo>();
+    regionsToMove = getRegions(this.conf, server);
+    if (regionsToMove.size() == 0) {
+      LOG.info("No Regions to move....Quitting now");
+      return;
+    } else if (regionServers.size() == 0) {
+      LOG.warn("No Regions were moved - no servers available");
+      throw new Exception("No online region servers");
+    }
+    while (true) {
+      regionsToMove = getRegions(this.conf, server);
+      regionsToMove.removeAll(movedRegions);
+      if (regionsToMove.size() == 0) {
+        break;
+      }
+      int counter = 0;
+      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
+          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
+          + ack);
+      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
+      List<Future<Boolean>> taskList = new ArrayList<Future<Boolean>>();
+      int serverIndex = 0;
+      while (counter < regionsToMove.size()) {
+        if (ack) {
+          Future<Boolean> task =
+              moveRegionsPool.submit(new MoveWithAck(admin, regionsToMove.get(counter), server,
+                  regionServers.get(serverIndex), movedRegions));
+          taskList.add(task);
+        } else {
+          Future<Boolean> task =
+              moveRegionsPool.submit(new MoveWithoutAck(admin, regionsToMove.get(counter), server,
+                  regionServers.get(serverIndex), movedRegions));
+          taskList.add(task);
+        }
+        counter++;
+        serverIndex = (serverIndex + 1) % regionServers.size();
+      }
+      moveRegionsPool.shutdown();
+      long timeoutInSeconds =
+          regionsToMove.size()
+              * admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
+      try {
+        if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
+          moveRegionsPool.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        moveRegionsPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      for (Future<Boolean> future : taskList) {
+        try {
+          // if even after shutdownNow threads are stuck we wait for 5 secs max
+          if (!future.get(5, TimeUnit.SECONDS)) {
+            LOG.error("Was Not able to move region....Exiting Now");
+            throw new Exception("Could not move region Exception");
+          }
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
+          throw e;
+        } catch (ExecutionException e) {
+          LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
+          throw e;
+        } catch (CancellationException e) {
+          LOG.error("Thread for moving region cancelled. Timeout for cancellation:"
+              + timeoutInSeconds + "secs", e);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Move Regions and make sure that they are up on the target server.If a region movement fails we
+   * exit as failure
+   */
+  private class MoveWithAck implements Callable<Boolean> {
+    private Admin admin;
+    private HRegionInfo region;
+    private String targetServer;
+    private List<HRegionInfo> movedRegions;
+    private String sourceServer;
+
+    public MoveWithAck(Admin admin, HRegionInfo regionInfo, String sourceServer,
+        String targetServer, List<HRegionInfo> movedRegions) {
+      this.admin = admin;
+      this.region = regionInfo;
+      this.targetServer = targetServer;
+      this.movedRegions = movedRegions;
+      this.sourceServer = sourceServer;
+    }
+
+    @Override
+    public Boolean call() throws IOException, InterruptedException {
+      boolean moved = false;
+      int count = 0;
+      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
+      int maxWaitInSeconds =
+          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
+      long startTime = EnvironmentEdgeManager.currentTime();
+      boolean sameServer = true;
+      // Assert we can scan the region in its current location
+      isSuccessfulScan(admin, region);
+      LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
+          + targetServer);
+      while (count < retries && sameServer) {
+        if (count > 0) {
+          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
+        }
+        count = count + 1;
+        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
+        long maxWait = startTime + (maxWaitInSeconds * 1000);
+        while (EnvironmentEdgeManager.currentTime() < maxWait) {
+          sameServer = isSameServer(admin, region, sourceServer);
+          if (!sameServer) {
+            break;
+          }
+          Thread.sleep(100);
+        }
+      }
+      if (sameServer) {
+        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
+            + ",newServer=" + this.targetServer);
+      } else {
+        isSuccessfulScan(admin, region);
+        LOG.info("Moved Region "
+            + region.getRegionNameAsString()
+            + " cost:"
+            + String.format("%.3f",
+              (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
+        moved = true;
+        movedRegions.add(region);
+      }
+      return moved;
+    }
+  }
+
+  /**
+   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
+   * RS down anyways and not abort on a stuck region. Improves movement performance
+   */
+  private class MoveWithoutAck implements Callable<Boolean> {
+    private Admin admin;
+    private HRegionInfo region;
+    private String targetServer;
+    private List<HRegionInfo> movedRegions;
+    private String sourceServer;
+
+    public MoveWithoutAck(Admin admin, HRegionInfo regionInfo, String sourceServer,
+        String targetServer, List<HRegionInfo> movedRegions) {
+      this.admin = admin;
+      this.region = regionInfo;
+      this.targetServer = targetServer;
+      this.movedRegions = movedRegions;
+      this.sourceServer = sourceServer;
+    }
+
+    @Override
+    public Boolean call() {
+      try {
+        LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
+            + targetServer);
+        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
+        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
+            + targetServer);
+      } catch (Exception e) {
+        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
+      } finally {
+        // we add region to the moved regions list in No Ack Mode since this is best effort
+        movedRegions.add(region);
+      }
+      return true;
+    }
+  }
+
+  private List<HRegionInfo> readRegionsFromFile(String filename) throws IOException {
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    File f = new File(filename);
+    if (!f.exists()) {
+      return regions;
+    }
+    FileInputStream fis = null;
+    DataInputStream dis = null;
+    try {
+      fis = new FileInputStream(f);
+      dis = new DataInputStream(fis);
+      int numRegions = dis.readInt();
+      int index = 0;
+      while (index < numRegions) {
+        regions.add(HRegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
+        index++;
+      }
+    } catch (IOException e) {
+      LOG.error("Error while reading regions from file:" + filename, e);
+      throw e;
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
+      if (fis != null) {
+        fis.close();
+      }
+    }
+    return regions;
+  }
+
+  /**
+   * Get online regions of the passed server
+   * @param conf
+   * @param server
+   * @return List of Regions online on the server
+   * @throws IOException
+   */
+  private List<HRegionInfo> getRegions(Configuration conf, String server) throws IOException {
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      return conn.getAdmin().getOnlineRegions(ServerName.valueOf(server));
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Write the number of regions moved in the first line followed by regions moved in subsequent
+   * lines
+   * @param filename
+   * @param movedRegions
+   * @throws IOException
+   */
+  private void writeFile(String filename, List<HRegionInfo> movedRegions) throws IOException {
+    FileOutputStream fos = null;
+    DataOutputStream dos = null;
+    try {
+      fos = new FileOutputStream(filename);
+      dos = new DataOutputStream(fos);
+      dos.writeInt(movedRegions.size());
+      for (HRegionInfo region : movedRegions) {
+        Bytes.writeByteArray(dos, region.toByteArray());
+      }
+    } catch (IOException e) {
+      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
+          + movedRegions.size() + " regions", e);
+      throw e;
+    } finally {
+      if (dos != null) {
+        dos.close();
+      }
+      if (fos != null) {
+        fos.close();
+      }
+    }
+  }
+
+  /**
+   * Excludes the servername whose hostname and port portion matches the list given in exclude file
+   * @param regionServers
+   * @param excludeFile
+   * @throws IOException
+   */
+  private void stripExcludes(ArrayList<String> regionServers, String excludeFile)
+      throws IOException {
+    if (excludeFile != null) {
+      ArrayList<String> excludes = readExcludes(excludeFile);
+      Iterator<String> i = regionServers.iterator();
+      while (i.hasNext()) {
+        String rs = i.next();
+        String rsPort =
+            rs.split(ServerName.SERVERNAME_SEPARATOR)[0] + ":"
+                + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
+        if (excludes.contains(rsPort)) {
+          i.remove();
+        }
+      }
+      LOG.info("Valid Region server targets are:" + regionServers.toString());
+      LOG.info("Excluded Servers are" + excludes.toString());
+    }
+  }
+  
+  /**
+   * Exclude master from list of RSs to move regions to
+   * @param regionServers
+   * @param admin
+   * @throws Exception
+   */
+  private void stripMaster(ArrayList<String> regionServers, Admin admin) throws Exception {
+      stripServer(regionServers, admin.getClusterStatus().getMaster().getHostname(),
+        admin.getClusterStatus().getMaster().getPort());
+  }
+
+  /**
+   * Create an Arraylst of servers listed in exclude file
+   * @param excludeFile
+   * @return ArrayList of servers to be excluded in format <hostname:port>
+   * @throws IOException
+   */
+  private ArrayList<String> readExcludes(String excludeFile) throws IOException {
+    ArrayList<String> excludeServers = new ArrayList<String>();
+    if (excludeFile == null) {
+      return excludeServers;
+    } else {
+      File f = new File(excludeFile);
+      String line;
+      BufferedReader br = null;
+      try {
+        br = new BufferedReader(new FileReader(f));
+        while ((line = br.readLine()) != null) {
+          line.trim();
+          if (!line.equals("")) {
+            excludeServers.add(line);
+          }
+        }
+      } catch (IOException e) {
+        LOG.warn("Exception while reading excludes file,continuing anyways", e);
+      } finally {
+        if (br != null) {
+          br.close();
+        }
+      }
+      return excludeServers;
+    }
+  }
+
+  /**
+   * Remove the servername whose hostname and port portion matches from the passed array of servers.
+   * Returns as side-effect the servername removed.
+   * @param regionServers
+   * @param hostname
+   * @param port
+   * @return server removed from list of Region Servers
+   * @throws Exception
+   */
+  private String stripServer(ArrayList<String> regionServers, String hostname, int port)
+      throws Exception {
+    String server = null;
+    String portString = Integer.toString(port);
+    Iterator<String> i = regionServers.iterator();
+    int noOfRs = regionServers.size();
+    while (i.hasNext()) {
+      server = i.next();
+      String[] splitServer = server.split(ServerName.SERVERNAME_SEPARATOR);
+      if (splitServer[0].equals(hostname) && splitServer[1].equals(portString)) {
+        i.remove();
+        return server;
+      }
+    }
+    if (regionServers.size() >= noOfRs) {
+      throw new Exception("Server " + hostname + ":" + Integer.toString(port)
+          + " is not in list of online servers(Offline/Incorrect)");
+    }
+    return server;
+  }
+
+  /**
+   * Get Arraylist of Servers in the cluster
+   * @param admin
+   * @return ArrayList of online region servers
+   * @throws IOException
+   */
+  private ArrayList<String> getServers(Admin admin) throws IOException {
+    ArrayList<ServerName> serverInfo =
+        new ArrayList<ServerName>(admin.getClusterStatus().getServers());
+    ArrayList<String> regionServers = new ArrayList<String>();
+    for (ServerName server : serverInfo) {
+      regionServers.add(server.getServerName());
+    }
+    return regionServers;
+  }
+
+  private void deleteFile(String filename) {
+    File f = new File(filename);
+    if (f.exists()) {
+      f.delete();
+    }
+  }
+
+  /**
+   * Tries to scan a row from passed region
+   * @param admin
+   * @param region
+   * @throws IOException
+   */
+  private void isSuccessfulScan(Admin admin, HRegionInfo region) throws IOException {
+    Scan scan = new Scan(region.getStartKey());
+    scan.setBatch(1);
+    scan.setCaching(1);
+    scan.setFilter(new FirstKeyOnlyFilter());
+    try {
+      Table table = admin.getConnection().getTable(region.getTable());
+      try {
+        ResultScanner scanner = table.getScanner(scan);
+        try {
+          scanner.next();
+        } finally {
+          scanner.close();
+        }
+      } finally {
+        table.close();
+      }
+    } catch (IOException e) {
+      LOG.error("Could not scan region:" + region.getEncodedName(), e);
+      throw e;
+    }
+  }
+
+  /**
+   * Returns true if passed region is still on serverName when we look at hbase:meta.
+   * @param admin
+   * @param region
+   * @param serverName
+   * @return true if region is hosted on serverName otherwise false
+   * @throws IOException
+   */
+  private boolean isSameServer(Admin admin, HRegionInfo region, String serverName)
+      throws IOException {
+    String serverForRegion = getServerNameForRegion(admin, region);
+    if (serverForRegion != null && serverForRegion.equals(serverName)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
+   * startcode comma-delimited. Can return null
+   * @param admin
+   * @param region
+   * @return regionServer hosting the given region
+   * @throws IOException
+   */
+  private String getServerNameForRegion(Admin admin, HRegionInfo region) throws IOException {
+    String server = null;
+    if (!admin.isTableEnabled(region.getTable())) {
+      return null;
+    }
+    if (region.isMetaRegion()) {
+      ZooKeeperWatcher zkw = new ZooKeeperWatcher(admin.getConfiguration(), "region_mover", null);
+      MetaTableLocator locator = new MetaTableLocator();
+      int maxWaitInSeconds =
+          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
+      try {
+        server = locator.waitMetaRegionLocation(zkw, maxWaitInSeconds * 1000).toString() + ",";
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while waiting for location of Meta", e);
+      } finally {
+        if (zkw != null) {
+          zkw.close();
+        }
+      }
+    } else {
+      Table table = admin.getConnection().getTable(TableName.META_TABLE_NAME);
+      try {
+        Get get = new Get(region.getRegionName());
+        get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+        get.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
+        Result result = table.get(get);
+        if (result != null) {
+          byte[] servername =
+              result.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+          byte[] startcode =
+              result.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
+          if (servername != null) {
+            server =
+                Bytes.toString(servername).replaceFirst(":", ",") + "," + Bytes.toLong(startcode);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Could not get Server Name for region:" + region.getEncodedName(), e);
+        throw e;
+      } finally {
+        table.close();
+      }
+    }
+    return server;
+  }
+
+  @Override
+  protected void addOptions() {
+    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
+    this.addRequiredOptWithArg("l", "Expected: load/unload");
+    this.addOptWithArg("m", "maxthreads",
+      "Define the maximum number of threads to use to unload and reload the regions");
+    this.addOptWithArg("x", "excludefile",
+      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
+          + "target host; useful for rack decommisioning.");
+    this.addOptWithArg("f", "filename",
+      "File to save regions list into unloading, or read from loading; "
+          + "default /tmp/<usernamehostname:port>");
+    this.addOptNoArg("n", "noAck",
+      "Enable Ack mode(default: true) which checks if region is online on target RegionServer -- "
+          + "Upon disabling,in case a region is stuck, it'll move on anyways");
+    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
+        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    String hostname = cmd.getOptionValue("r");
+    rmbuilder = new RegionMoverBuilder(hostname);
+    if (cmd.hasOption('m')) {
+      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
+    }
+    if (cmd.hasOption('n')) {
+      rmbuilder.ack(false);
+    }
+    if (cmd.hasOption('f')) {
+      rmbuilder.filename(cmd.getOptionValue('f'));
+    }
+    if (cmd.hasOption('x')) {
+      rmbuilder.excludeFile(cmd.getOptionValue('x'));
+    }
+    if (cmd.hasOption('t')) {
+      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
+    }
+    this.loadUnload = cmd.getOptionValue("l").toLowerCase();
+  }
+
+  @Override
+  protected int doWork() throws Exception {
+    RegionMover rm = rmbuilder.build();
+    if (loadUnload.equalsIgnoreCase("load")) {
+      rm.load();
+    } else if (loadUnload.equalsIgnoreCase("unload")) {
+      rm.unload();
+    } else {
+      printUsage();
+      System.exit(1);
+    }
+    return 0;
+  }
+
+  public static void main(String[] args) {
+    new RegionMover().doStaticMain(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/939697b4/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java
new file mode 100644
index 0000000..4fd9f65
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.RegionMover.RegionMoverBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for Region Mover Load/Unload functionality with and without ack mode and also to test
+ * exclude functionality useful for rack decommissioning
+ */
+@Category(MediumTests.class)
+public class TestRegionMover {
+
+  final Log LOG = LogFactory.getLog(getClass());
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    // Create a pre-split table just to populate some regions
+    TableName tableName = TableName.valueOf("testRegionMover");
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      TEST_UTIL.deleteTable(tableName);
+    }
+    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+    try {
+      admin.setBalancerRunning(false, true);
+      String startKey = "a";
+      String endKey = "z";
+      admin.createTable(tableDesc, startKey.getBytes(), endKey.getBytes(), 9);
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+
+  @Test
+  public void testLoadWithAck() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    String rsName = regionServer.getServerName().getHostname();
+    int port = regionServer.getServerName().getPort();
+    int noRegions = regionServer.getNumberOfOnlineRegions();
+    String rs = rsName + ":" + Integer.toString(port);
+    RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true).maxthreads(8);
+    RegionMover rm = rmBuilder.build();
+    rm.setConf(TEST_UTIL.getConfiguration());
+    LOG.info("Unloading " + rs);
+    rm.unload();
+    assertEquals(0, regionServer.getNumberOfOnlineRegions());
+    LOG.info("Successfully Unloaded\nNow Loading");
+    rm.load();
+    assertEquals(noRegions, regionServer.getNumberOfOnlineRegions());
+  }
+
+  @Test
+  public void testLoadWithoutAck() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    String rsName = regionServer.getServerName().getHostname();
+    int port = regionServer.getServerName().getPort();
+    int noRegions = regionServer.getNumberOfOnlineRegions();
+    String rs = rsName + ":" + Integer.toString(port);
+    RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false);
+    RegionMover rm = rmBuilder.build();
+    rm.setConf(TEST_UTIL.getConfiguration());
+    LOG.info("Unloading " + rs);
+    rm.unload();
+    assertEquals(0, regionServer.getNumberOfOnlineRegions());
+    LOG.info("Successfully Unloaded\nNow Loading");
+    rm.load();
+    Thread.sleep(100);
+    assertEquals(noRegions, regionServer.getNumberOfOnlineRegions());
+  }
+
+  @Test
+  public void testUnloadWithoutAck() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    String rsName = regionServer.getServerName().getHostname();
+    int port = regionServer.getServerName().getPort();
+    String rs = rsName + ":" + Integer.toString(port);
+    RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false);
+    RegionMover rm = rmBuilder.build();
+    rm.setConf(TEST_UTIL.getConfiguration());
+    LOG.info("Unloading " + rs);
+    rm.unload();
+    assertEquals(0, regionServer.getNumberOfOnlineRegions());
+  }
+
+  @Test
+  public void testUnloadWithAck() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    String rsName = regionServer.getServerName().getHostname();
+    int port = regionServer.getServerName().getPort();
+    String rs = rsName + ":" + Integer.toString(port);
+    RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true);
+    RegionMover rm = rmBuilder.build();
+    rm.setConf(TEST_UTIL.getConfiguration());
+    rm.unload();
+    LOG.info("Unloading " + rs);
+    assertEquals(0, regionServer.getNumberOfOnlineRegions());
+  }
+
+  /**
+   * To test that we successfully exclude a server from the unloading process We test for the number
+   * of regions on Excluded server and also test that regions are unloaded successfully
+   * @throws Exception
+   */
+  @Test
+  public void testExclude() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    FileWriter fos = new FileWriter("/tmp/exclude_file");
+    HRegionServer excludeServer = cluster.getRegionServer(1);
+    String excludeHostname = excludeServer.getServerName().getHostname();
+    int excludeServerPort = excludeServer.getServerName().getPort();
+    int regionsExcludeServer = excludeServer.getNumberOfOnlineRegions();
+    String excludeServerName = excludeHostname + ":" + Integer.toString(excludeServerPort);
+    fos.write(excludeServerName);
+    fos.close();
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    String rsName = regionServer.getServerName().getHostname();
+    int port = regionServer.getServerName().getPort();
+    String rs = rsName + ":" + Integer.toString(port);
+    RegionMoverBuilder rmBuilder =
+        new RegionMoverBuilder(rs).ack(true).excludeFile("/tmp/exclude_file");
+    RegionMover rm = rmBuilder.build();
+    rm.setConf(TEST_UTIL.getConfiguration());
+    rm.unload();
+    LOG.info("Unloading " + rs);
+    assertEquals(0, regionServer.getNumberOfOnlineRegions());
+    assertEquals(regionsExcludeServer, cluster.getRegionServer(1).getNumberOfOnlineRegions());
+    LOG.info("Before:" + regionsExcludeServer + " After:"
+        + cluster.getRegionServer(1).getNumberOfOnlineRegions());
+  }
+
+}


[5/8] hbase git commit: HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)

Posted by sy...@apache.org.
HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/210c3dd9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/210c3dd9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/210c3dd9

Branch: refs/heads/hbase-12439
Commit: 210c3dd93748b5de65301f2cca2342f36e169b78
Parents: e24d03b
Author: tedyu <yu...@gmail.com>
Authored: Tue Oct 27 19:40:40 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Oct 27 19:40:40 2015 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   7 +-
 .../hbase/replication/ReplicationFactory.java   |   7 +-
 .../replication/ReplicationPeersZKImpl.java     |  25 +++-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |  39 +++++-
 .../hbase/util/hbck/ReplicationChecker.java     | 134 +++++++++++++++++++
 .../replication/TestReplicationAdmin.java       |  34 +++++
 .../hadoop/hbase/util/BaseTestHBaseFsck.java    |   3 +-
 .../hadoop/hbase/util/TestHBaseFsckOneRS.java   |  59 +++++++-
 .../hadoop/hbase/util/hbck/HbckTestingUtil.java |  11 +-
 9 files changed, 303 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index b33e64d..8bd1267 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -125,11 +125,12 @@ public class ReplicationAdmin implements Closeable {
     try {
       zkw = createZooKeeperWatcher();
       try {
-        this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
-        this.replicationPeers.init();
         this.replicationQueuesClient =
             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
         this.replicationQueuesClient.init();
+        this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
+          this.replicationQueuesClient, this.connection);
+        this.replicationPeers.init();
       } catch (Exception exception) {
         if (zkw != null) {
           zkw.close();
@@ -187,7 +188,7 @@ public class ReplicationAdmin implements Closeable {
     this.replicationPeers.addPeer(id,
       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
   }
-
+  
   /**
    * Add a new remote slave cluster for replication.
    * @param id a short name that identifies the cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index f115a39..91e77ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -42,7 +42,12 @@ public class ReplicationFactory {
 
   public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
       Abortable abortable) {
-    return new ReplicationPeersZKImpl(zk, conf, abortable);
+    return getReplicationPeers(zk, conf, null, abortable);
+  }
+  
+  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
+    return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
   }
 
   public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index a223531..1884469 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   // Map of peer clusters keyed by their id
   private Map<String, ReplicationPeerZKImpl> peerClusters;
   private final String tableCFsNodeName;
+  private final ReplicationQueuesClient queuesClient;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
 
   public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
-      Abortable abortable) {
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
     super(zk, conf, abortable);
     this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
     this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
+    this.queuesClient = queuesClient;
   }
 
   @Override
@@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
         throw new IllegalArgumentException("Found invalid peer name:" + id);
       }
 
+      checkQueuesDeleted(id);
+      
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
@@ -561,5 +565,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return ProtobufUtil.prependPBMagic(bytes);
   }
 
-
+  private void checkQueuesDeleted(String peerId) throws ReplicationException {
+    if (queuesClient == null) return;
+    try {
+      List<String> replicators = queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (queueInfo.getPeerId().equals(peerId)) {
+            throw new ReplicationException("undeleted queue for peerId: " + peerId
+                + ", replicator: " + replicator + ", queueId: " + queueId);
+          }
+        }
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index e55b53f..88c5427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
+import org.apache.hadoop.hbase.util.hbck.ReplicationChecker;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
@@ -246,6 +247,7 @@ public class HBaseFsck extends Configured implements Closeable {
   private boolean fixReferenceFiles = false; // fix lingering reference store file
   private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
   private boolean fixTableLocks = false; // fix table locks which are expired
+  private boolean fixReplication = false; // fix undeleted replication queues for removed peer
   private boolean fixAny = false; // Set to true if any of the fix is required.
 
   // limit checking/fixes to listed tables, if empty attempt to check/fix all
@@ -702,6 +704,8 @@ public class HBaseFsck extends Configured implements Closeable {
 
     checkAndFixTableLocks();
 
+    checkAndFixReplication();
+    
     // Remove the hbck lock
     unlockHbck();
 
@@ -3257,12 +3261,29 @@ public class HBaseFsck extends Configured implements Closeable {
   }
 
   private void checkAndFixTableLocks() throws IOException {
-    TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors);
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
+    TableLockChecker checker = new TableLockChecker(zkw, errors);
     checker.checkTableLocks();
 
     if (this.fixTableLocks) {
       checker.fixExpiredTableLocks();
     }
+    zkw.close();
+  }
+  
+  private void checkAndFixReplication() throws IOException {
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
+    try {
+      ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
+      checker.checkUnDeletedQueues();
+
+      if (checker.hasUnDeletedQueues() && this.fixReplication) {
+        checker.fixUnDeletedQueues();
+        setShouldRerun();
+      }
+    } finally {
+      zkw.close();
+    }
   }
 
   /**
@@ -3801,7 +3822,7 @@ public class HBaseFsck extends Configured implements Closeable {
       HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
       ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
       WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE,
-      NO_TABLE_STATE
+      NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE
     }
     void clear();
     void report(String message);
@@ -4202,6 +4223,14 @@ public class HBaseFsck extends Configured implements Closeable {
     fixTableLocks = shouldFix;
     fixAny |= shouldFix;
   }
+  
+  /**
+   * Set replication fix mode.
+   */
+  public void setFixReplication(boolean shouldFix) {
+    fixReplication = shouldFix;
+    fixAny |= shouldFix;
+  }
 
   /**
    * Check if we should rerun fsck again. This checks if we've tried to
@@ -4462,6 +4491,10 @@ public class HBaseFsck extends Configured implements Closeable {
     out.println("  Table lock options");
     out.println("   -fixTableLocks    Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)");
 
+    out.println("");
+    out.println(" Replication options");
+    out.println("   -fixReplication   Deletes replication queues for removed peers");
+    
     out.flush();
     errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
 
@@ -4647,6 +4680,8 @@ public class HBaseFsck extends Configured implements Closeable {
         setRegionBoundariesCheck();
       } else if (cmd.equals("-fixTableLocks")) {
         setFixTableLocks(true);
+      } else if (cmd.equals("-fixReplication")) {
+        setFixReplication(true);
       } else if (cmd.startsWith("-")) {
         errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
         return printUsageAndExit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
new file mode 100644
index 0000000..bf44a50
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.hbck;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.util.HBaseFsck;
+import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * Check and fix undeleted replication queues for removed peerId.
+ */
+@InterfaceAudience.Private
+public class ReplicationChecker {
+  private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+  private ErrorReporter errorReporter;
+  private ReplicationQueuesClient queuesClient;
+  private ReplicationPeers replicationPeers;
+  private ReplicationQueueDeletor queueDeletor;
+  // replicator with its queueIds for removed peers
+  private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
+  
+  public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
+      ErrorReporter errorReporter) throws IOException {
+    try {
+      this.errorReporter = errorReporter;
+      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
+      this.queuesClient.init();
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
+        connection);
+      this.replicationPeers.init();
+      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
+    } catch (ReplicationException e) {
+      throw new IOException("failed to construct ReplicationChecker", e);
+    }
+  }
+
+  public boolean hasUnDeletedQueues() {
+    return errorReporter.getErrorList()
+        .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+  }
+
+  public void checkUnDeletedQueues() throws IOException {
+    Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
+    try {
+      List<String> replicators = this.queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (!peerIds.contains(queueInfo.getPeerId())) {
+            if (!undeletedQueueIds.containsKey(replicator)) {
+              undeletedQueueIds.put(replicator, new ArrayList<String>());
+            }
+            undeletedQueueIds.get(replicator).add(queueId);
+
+            String msg = "Undeleted replication queue for removed peer found: "
+                + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+                  queueInfo.getPeerId(), replicator, queueId);
+            errorReporter.reportError(
+              HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      throw new IOException(ke);
+    }
+  }
+  
+  private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
+    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
+      super(zk, conf, abortable);
+    }
+    
+    public void removeQueue(String replicator, String queueId) throws IOException {
+      String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+        queueId);
+      try {
+        ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+        LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
+      } catch (KeeperException e) {
+        throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
+            + queueId);
+      }
+    }
+  }
+  
+  public void fixUnDeletedQueues() throws IOException {
+    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        queueDeletor.removeQueue(replicator, queueId);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e126205..e187b9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -26,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -114,6 +117,37 @@ public class TestReplicationAdmin {
     admin.removePeer(ID_SECOND);
     assertEquals(0, admin.getPeersCount());
   }
+  
+  @Test
+  public void testAddPeerWithUnDeletedQueues() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(zkw, conf, null);
+    repQueues.init("server1");
+
+    // add queue for ID_ONE
+    repQueues.addLog(ID_ONE, "file1");
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+      fail();
+    } catch (ReplicationException e) {
+      // OK!
+    }
+    repQueues.removeQueue(ID_ONE);
+    assertEquals(0, repQueues.getAllQueues().size());
+    
+    // add recovered queue for ID_ONE
+    repQueues.addLog(ID_ONE + "-server2", "file1");
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+      fail();
+    } catch (ReplicationException e) {
+      // OK!
+    }
+    repQueues.removeAllQueues();
+    zkw.close();
+  }
 
   /**
    * basic checks that when we add a peer that it is enabled, and that we can disable

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
index 7459a7d..8e8bb41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
@@ -498,7 +498,8 @@ public class BaseTestHBaseFsck {
 
       // fix hole
       assertErrors(
-          doFsck(conf, false, true, false, false, false, false, false, false, false, false, null),
+          doFsck(conf, false, true, false, false, false, false, false, false, false, false, false,
+            null),
           new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
               ERROR_CODE.NOT_IN_META_OR_DEPLOYED });
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
index a44ccd1..df3c69c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.io.hfile.TestHFile;
 import org.apache.hadoop.hbase.master.AssignmentManager;
@@ -50,10 +51,13 @@ import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
 import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -862,7 +866,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
         // for some time until children references are deleted. HBCK erroneously sees this as
         // overlapping regions
         HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false,
-            false, null);
+            false, false, null);
         assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
 
         // assert that the split hbase:meta entry is still there.
@@ -941,7 +945,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
 
         // now fix it. The fix should not revert the region split, but add daughters to META
         hbck = doFsck(conf, true, true, false, false, false, false, false, false, false,
-            false, null);
+            false, false, null);
         assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
             HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
             HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
@@ -1474,4 +1478,55 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
     writeLock.release(); // release for clean state
     tableLockManager.tableDeleted(tableName);
   }
+
+  @Test(timeout=180000)
+  public void testCheckReplication() throws Exception {
+    // check no errors
+    HBaseFsck hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    
+    // create peer
+    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
+    Assert.assertEquals(0, replicationAdmin.getPeersCount());
+    int zkPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+      HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    replicationAdmin.addPeer("1", "127.0.0.1:" + zkPort + ":/hbase");
+    replicationAdmin.getPeersCount();
+    Assert.assertEquals(1, replicationAdmin.getPeersCount());
+    
+    // create replicator
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection);
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(zkw, conf, connection);
+    repQueues.init("server1");
+    // queues for current peer, no errors
+    repQueues.addLog("1", "file1");
+    repQueues.addLog("1-server2", "file1");
+    Assert.assertEquals(2, repQueues.getAllQueues().size());
+    hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    
+    // queues for removed peer
+    repQueues.addLog("2", "file1");
+    repQueues.addLog("2-server2", "file1");
+    Assert.assertEquals(4, repQueues.getAllQueues().size());
+    hbck = doFsck(conf, false);
+    assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
+        HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+        HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE });
+    
+    // fix the case
+    hbck = doFsck(conf, true);
+    hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    // ensure only "2" is deleted
+    Assert.assertEquals(2, repQueues.getAllQueues().size());
+    Assert.assertNull(repQueues.getLogsInQueue("2"));
+    Assert.assertNull(repQueues.getLogsInQueue("2-sever2"));
+    
+    replicationAdmin.removePeer("1");
+    repQueues.removeAllQueues();
+    zkw.close();
+    replicationAdmin.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
index a28378e..d1e774e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
@@ -40,13 +40,13 @@ public class HbckTestingUtil {
 
   public static HBaseFsck doFsck(
       Configuration conf, boolean fix, TableName table) throws Exception {
-    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
+    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
   }
 
-  public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
-      boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
-      boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
-      boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
+  public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
+      boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
+      boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles,
+      boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication,
       TableName table) throws Exception {
     HBaseFsck fsck = new HBaseFsck(conf, exec);
     try {
@@ -62,6 +62,7 @@ public class HbckTestingUtil {
       fsck.setFixReferenceFiles(fixReferenceFiles);
       fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
       fsck.setFixTableLocks(fixTableLocks);
+      fsck.setFixReplication(fixReplication);
       if (table != null) {
         fsck.includeTable(table);
       }


[7/8] hbase git commit: HBASE-14660 AssertionError found when using offheap BucketCache with assertion enabled (ram)

Posted by sy...@apache.org.
HBASE-14660 AssertionError found when using offheap BucketCache with
assertion enabled (ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/51538c5f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/51538c5f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/51538c5f

Branch: refs/heads/hbase-12439
Commit: 51538c5ff89dd7ee8e32ef4895d10bfc58045b17
Parents: bae94ca9
Author: ramkrishna <ra...@gmail.com>
Authored: Wed Oct 28 15:20:41 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Wed Oct 28 15:20:41 2015 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  33 +-
 .../io/hfile/TestScannerFromBucketCache.java    | 379 +++++++++++++++++++
 .../hadoop/hbase/io/hfile/TestSeekTo.java       |   4 +
 3 files changed, 409 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/51538c5f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 49b6f5e..930f42a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
-import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
@@ -930,7 +929,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         // TODO : reduce the varieties of KV here. Check if based on a boolean
         // we can handle the 'no tags' case.
         if (currTagsLen > 0) {
-          if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
+          if (this.curBlock.usesSharedMemory()) {
             ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
               + blockBuffer.position(), getCellBufSize(), seqId);
           } else {
@@ -938,7 +937,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
                     + blockBuffer.position(), cellBufSize, seqId);
           }
         } else {
-          if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
+          if (this.curBlock.usesSharedMemory()) {
             ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
                     + blockBuffer.position(), getCellBufSize(), seqId);
           } else {
@@ -948,11 +947,31 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         }
       } else {
         ByteBuffer buf = blockBuffer.asSubByteBuffer(cellBufSize);
-        if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
-          ret = new ShareableMemoryOffheapKeyValue(buf, buf.position(), cellBufSize,
-            currTagsLen > 0, seqId);
+        if (buf.isDirect()) {
+          if (this.curBlock.usesSharedMemory()) {
+            ret = new ShareableMemoryOffheapKeyValue(buf, buf.position(), cellBufSize,
+                currTagsLen > 0, seqId);
+          } else {
+            ret = new OffheapKeyValue(buf, buf.position(), cellBufSize, currTagsLen > 0, seqId);
+          }
         } else {
-          ret = new OffheapKeyValue(buf, buf.position(), cellBufSize, currTagsLen > 0, seqId);
+          if (this.curBlock.usesSharedMemory()) {
+            if (currTagsLen > 0) {
+              ret = new ShareableMemoryKeyValue(buf.array(), buf.arrayOffset() + buf.position(),
+                  cellBufSize, seqId);
+            } else {
+              ret = new ShareableMemoryNoTagsKeyValue(buf.array(),
+                  buf.arrayOffset() + buf.position(), cellBufSize, seqId);
+            }
+          } else {
+            if (currTagsLen > 0) {
+              ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset() + buf.position(),
+                  cellBufSize, seqId);
+            } else {
+              ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset() + buf.position(),
+                  cellBufSize, seqId);
+            }
+          }
         }
       }
       return ret;

http://git-wip-us.apache.org/repos/asf/hbase/blob/51538c5f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
new file mode 100644
index 0000000..bb6f899
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
@@ -0,0 +1,379 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
+import org.apache.hadoop.hbase.ShareableMemory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestScannerFromBucketCache {
+  private static final Log LOG = LogFactory.getLog(TestScannerFromBucketCache.class);
+  @Rule
+  public TestName name = new TestName();
+
+  HRegion region = null;
+  private HBaseTestingUtility test_util;
+  public Configuration conf;
+  private final int MAX_VERSIONS = 2;
+  byte[] val = new byte[512 * 1024];
+
+  // Test names
+  private TableName tableName;
+
+  private void setUp(boolean offheap, boolean useBucketCache) throws IOException {
+    test_util = HBaseTestingUtility.createLocalHTU();
+    conf = test_util.getConfiguration();
+    if (useBucketCache) {
+      conf.setInt("hbase.bucketcache.size", 400);
+      if (offheap) {
+        conf.setStrings("hbase.bucketcache.ioengine", "offheap");
+      } else {
+        conf.setStrings("hbase.bucketcache.ioengine", "heap");
+      }
+      conf.setInt("hbase.bucketcache.writer.threads", 10);
+      conf.setFloat("hfile.block.cache.size", 0.2f);
+      conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
+    }
+    tableName = TableName.valueOf(name.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentEdgeManagerTestHelper.reset();
+    LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
+    test_util.cleanupTestDir();
+    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
+  }
+
+  String getName() {
+    return name.getMethodName();
+  }
+
+  @Test
+  public void testBasicScanWithLRUCache() throws IOException {
+    setUp(false, false);
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] qf1 = Bytes.toBytes("qualifier1");
+    byte[] qf2 = Bytes.toBytes("qualifier2");
+    byte[] fam1 = Bytes.toBytes("lrucache");
+
+    long ts1 = 1; // System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+
+    // Setting up region
+    String method = this.getName();
+    this.region = initHRegion(tableName, method, conf, test_util, fam1);
+    try {
+      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
+
+      List<Cell> actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertFalse(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+      // do the scan again and verify. This time it should be from the lru cache
+      actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertFalse(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
+  public void testBasicScanWithOffheapBucketCache() throws IOException {
+    setUp(true, true);
+    byte[] row1 = Bytes.toBytes("row1offheap");
+    byte[] qf1 = Bytes.toBytes("qualifier1");
+    byte[] qf2 = Bytes.toBytes("qualifier2");
+    byte[] fam1 = Bytes.toBytes("famoffheap");
+
+    long ts1 = 1; // System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+
+    // Setting up region
+    String method = this.getName();
+    this.region = initHRegion(tableName, method, conf, test_util, fam1);
+    try {
+      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
+
+      List<Cell> actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertFalse(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+      // Wait for the bucket cache threads to move the data to offheap
+      Thread.sleep(500);
+      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
+      actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertTrue(actual.get(i) instanceof OffheapKeyValue);
+        assertTrue(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+
+    } catch (InterruptedException e) {
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
+  public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
+    setUp(true, true);
+    byte[] row1 = Bytes.toBytes("row1offheap");
+    byte[] qf1 = Bytes.toBytes("qualifier1");
+    byte[] qf2 = Bytes.toBytes("qualifier2");
+    byte[] fam1 = Bytes.toBytes("famoffheap");
+
+    long ts1 = 1; // System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+
+    // Setting up region
+    String method = this.getName();
+    this.region = initHRegion(tableName, method, conf, test_util, fam1);
+    try {
+      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, true);
+
+      List<Cell> actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertFalse(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+      // Wait for the bucket cache threads to move the data to offheap
+      Thread.sleep(500);
+      // do the scan again and verify. This time it should be from the bucket cache in offheap mode
+      // but one of the cell will be copied due to the asSubByteBuff call
+      Scan scan = new Scan(row1);
+      scan.addFamily(fam1);
+      scan.setMaxVersions(10);
+      actual = new ArrayList<Cell>();
+      InternalScanner scanner = region.getScanner(scan, false);
+
+      boolean hasNext = scanner.next(actual);
+      assertEquals(false, hasNext);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        if (i != 5) {
+          // the last cell fetched will be of type shareable but not offheap because
+          // the MBB is copied to form a single cell
+          assertTrue(actual.get(i) instanceof OffheapKeyValue);
+        }
+        assertTrue(actual.get(i) instanceof ShareableMemory);
+      }
+
+    } catch (InterruptedException e) {
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
+  public void testBasicScanWithOnheapBucketCache() throws IOException {
+    setUp(false, true);
+    byte[] row1 = Bytes.toBytes("row1onheap");
+    byte[] qf1 = Bytes.toBytes("qualifier1");
+    byte[] qf2 = Bytes.toBytes("qualifier2");
+    byte[] fam1 = Bytes.toBytes("famonheap");
+
+    long ts1 = 1; // System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+
+    // Setting up region
+    String method = this.getName();
+    this.region = initHRegion(tableName, method, conf, test_util, fam1);
+    try {
+      List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
+
+      List<Cell> actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertFalse(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+      // do the scan again and verify. This time it should be from the bucket cache in onheap mode
+      actual = performScan(row1, fam1);
+      // Verify result
+      for (int i = 0; i < expected.size(); i++) {
+        assertFalse(actual.get(i) instanceof OffheapKeyValue);
+        assertTrue(actual.get(i) instanceof ShareableMemory);
+        assertTrue(CellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
+      }
+
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
+      long ts2, long ts3, boolean withVal) throws IOException {
+    // Putting data in Region
+    Put put = null;
+    KeyValue kv13 = null;
+    KeyValue kv12 = null;
+    KeyValue kv11 = null;
+
+    KeyValue kv23 = null;
+    KeyValue kv22 = null;
+    KeyValue kv21 = null;
+    if (!withVal) {
+      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+
+      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    } else {
+      kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, val);
+      kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, val);
+      kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, val);
+
+      kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, val);
+      kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, val);
+      kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, val);
+    }
+
+    put = new Put(row1);
+    put.add(kv13);
+    put.add(kv12);
+    put.add(kv11);
+    put.add(kv23);
+    put.add(kv22);
+    put.add(kv21);
+    region.put(put);
+    region.flush(true);
+    Store store = region.getStore(fam1);
+    while (store.getStorefilesCount() <= 0) {
+      try {
+        Thread.sleep(20);
+      } catch (InterruptedException e) {
+      }
+    }
+
+    // Expected
+    List<Cell> expected = new ArrayList<Cell>();
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv23);
+    expected.add(kv22);
+    return expected;
+  }
+
+  private List<Cell> performScan(byte[] row1, byte[] fam1) throws IOException {
+    Scan scan = new Scan(row1);
+    scan.addFamily(fam1);
+    scan.setMaxVersions(MAX_VERSIONS);
+    List<Cell> actual = new ArrayList<Cell>();
+    InternalScanner scanner = region.getScanner(scan, false);
+
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    return actual;
+  }
+
+  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
+      HBaseTestingUtility test_util, byte[]... families) throws IOException {
+    return initHRegion(tableName, null, null, callingMethod, conf, test_util, false, families);
+  }
+
+  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
+      byte[]... families) throws IOException {
+    Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
+    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
+    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
+    return initHRegion(tableName, startKey, stopKey, callingMethod, conf, test_util, isReadOnly,
+      Durability.SYNC_WAL, wal, families);
+  }
+
+  /**
+   * @param tableName
+   * @param startKey
+   * @param stopKey
+   * @param callingMethod
+   * @param conf
+   * @param isReadOnly
+   * @param families
+   * @throws IOException
+   * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
+   *         when done.
+   */
+  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
+      Durability durability, WAL wal, byte[]... families) throws IOException {
+    return test_util.createLocalHRegion(tableName, startKey, stopKey, isReadOnly, durability, wal,
+      families);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/51538c5f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
index bd5b098..c1d91ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.OffheapKeyValue;
+import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -215,6 +217,8 @@ public class TestSeekTo {
 
     // seekBefore d, so the scanner points to c
     assertTrue(scanner.seekBefore(toKV("d", tagUsage)));
+    assertFalse(scanner.getCell() instanceof ShareableMemory);
+    assertFalse(scanner.getCell() instanceof OffheapKeyValue);
     assertEquals("c", toRowStr(scanner.getCell()));
     // reseekTo e and g
     assertEquals(0, scanner.reseekTo(toKV("c", tagUsage)));


[2/8] hbase git commit: HBASE-14709 Parent change breaks graceful_stop.sh on a cluster

Posted by sy...@apache.org.
HBASE-14709 Parent change breaks graceful_stop.sh on a cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/007e4dfa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/007e4dfa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/007e4dfa

Branch: refs/heads/hbase-12439
Commit: 007e4dfa1384f9746174442a01f512a5744a83da
Parents: 939697b
Author: stack <st...@apache.org>
Authored: Tue Oct 27 16:42:39 2015 -0700
Committer: stack <st...@apache.org>
Committed: Tue Oct 27 16:42:49 2015 -0700

----------------------------------------------------------------------
 bin/graceful_stop.sh | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/007e4dfa/bin/graceful_stop.sh
----------------------------------------------------------------------
diff --git a/bin/graceful_stop.sh b/bin/graceful_stop.sh
index 98b27e3..5eea06c 100755
--- a/bin/graceful_stop.sh
+++ b/bin/graceful_stop.sh
@@ -84,7 +84,7 @@ fi
 hostname=$1
 filename="/tmp/$hostname"
 
-local=false
+local=
 localhostname=`/bin/hostname`
 
 if [ "$localhostname" == "$hostname" ]; then
@@ -103,7 +103,7 @@ log "Unloaded $hostname region(s)"
 hosts="/tmp/$(basename $0).$$.tmp"
 echo $hostname >> $hosts
 if [ "$thrift" != "" ]; then
-  log "Stopping thrift"
+  log "Stopping thrift server on $hostname"
   if [ "$local" ]; then
     "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} stop thrift
   else
@@ -111,28 +111,28 @@ if [ "$thrift" != "" ]; then
   fi
 fi
 if [ "$rest" != "" ]; then
-  log "Stopping rest"
+  log "Stopping rest server on $hostname"
   if [ "$local" ]; then
     "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} stop rest
   else
     "$bin"/hbase-daemons.sh --config ${HBASE_CONF_DIR} --hosts ${hosts} stop rest
   fi
 fi
-log "Stopping regionserver"
+log "Stopping regionserver on $hostname"
 if [ "$local" ]; then
   "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} stop regionserver
 else
   "$bin"/hbase-daemons.sh --config ${HBASE_CONF_DIR} --hosts ${hosts} stop regionserver
 fi
 if [ "$restart" != "" ]; then
-  log "Restarting regionserver"
+  log "Restarting regionserver on $hostname"
   if [ "$local" ]; then
     "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} start regionserver
   else
     "$bin"/hbase-daemons.sh --config ${HBASE_CONF_DIR} --hosts ${hosts} start regionserver
   fi
   if [ "$thrift" != "" ]; then
-    log "Restarting thrift"
+    log "Restarting thrift server on $hostname"
     # -b 0.0.0.0 says listen on all interfaces rather than just default.
     if [ "$local" ]; then
       "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} start thrift -b 0.0.0.0
@@ -141,7 +141,7 @@ if [ "$restart" != "" ]; then
     fi
   fi
   if [ "$rest" != "" ]; then
-    log "Restarting rest"
+    log "Restarting rest server on $hostname"
     if [ "$local" ]; then
       "$bin"/hbase-daemon.sh --config ${HBASE_CONF_DIR} start rest
     else


[6/8] hbase git commit: HBASE-14529 - Addendum to avoid SIGHUP call for WINDOWS OS

Posted by sy...@apache.org.
HBASE-14529 - Addendum to avoid SIGHUP call for WINDOWS OS


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bae94ca9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bae94ca9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bae94ca9

Branch: refs/heads/hbase-12439
Commit: bae94ca9729bec2e0fe3556a31e29a2444737b1f
Parents: 210c3dd
Author: ramkrishna <ra...@gmail.com>
Authored: Wed Oct 28 11:52:20 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Wed Oct 28 11:52:20 2015 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegionServer.java     | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bae94ca9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8a5e423..0234769 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -52,6 +52,7 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.servlet.http.HttpServlet;
 
+import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -606,12 +607,14 @@ public class HRegionServer extends HasThread implements
     this.walRoller = new LogRoller(this, this);
     this.choreService = new ChoreService(getServerName().toString());
 
-    Signal.handle(new Signal("HUP"), new SignalHandler() {
-      public void handle(Signal signal) {
-        getConfiguration().reloadConfiguration();
-        configurationManager.notifyAllObservers(getConfiguration());
-      }
-    });
+    if (!SystemUtils.IS_OS_WINDOWS) {
+      Signal.handle(new Signal("HUP"), new SignalHandler() {
+        public void handle(Signal signal) {
+          getConfiguration().reloadConfiguration();
+          configurationManager.notifyAllObservers(getConfiguration());
+        }
+      });
+    }
   }
 
   protected TableDescriptors getFsTableDescriptors() throws IOException {


[4/8] hbase git commit: HBASE-14695 Fix some easy HTML warnings and bad links

Posted by sy...@apache.org.
HBASE-14695 Fix some easy HTML warnings and bad links


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e24d03b1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e24d03b1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e24d03b1

Branch: refs/heads/hbase-12439
Commit: e24d03b10c34cca4e51d037ae51fef4eca1666de
Parents: 0e6dd32
Author: Misty Stanley-Jones <ms...@cloudera.com>
Authored: Mon Oct 26 10:11:10 2015 +1000
Committer: Misty Stanley-Jones <ms...@cloudera.com>
Committed: Wed Oct 28 12:12:40 2015 +1000

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/package-info.java    |  2 +-
 .../org/apache/hadoop/hbase/util/BloomFilter.java   |  2 +-
 .../org/apache/hadoop/hbase/util/FSMapRUtils.java   |  4 ++--
 pom.xml                                             | 16 ++++++++++++----
 src/main/asciidoc/_chapters/ops_mgt.adoc            |  3 ++-
 src/main/asciidoc/_chapters/performance.adoc        |  2 +-
 src/main/site/site.xml                              |  2 +-
 src/main/site/xdoc/index.xml                        |  8 ++++----
 8 files changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/package-info.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/package-info.java
index cf28c91..dd85d3e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/package-info.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/package-info.java
@@ -200,7 +200,7 @@ public class MyLittleHBaseClient {
 
 <h2><a name="related" >Related Documentation</a></h2>
 <ul>
-  <li><a href="http://hbase.org">HBase Home Page</a>
+  <li><a href="http://hbase.org/">HBase Home Page</a>
   <li><a href="http://hadoop.apache.org/">Hadoop Home Page</a>
 </ul>
   <p>See also the section in the HBase Reference Guide where it discusses

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
index 197ff12..2062244 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
  * trade-off between the false positive rate and the size.
  *
  * <p>
- * Originally inspired by <a href="http://www.one-lab.org">European Commission
+ * Originally inspired by <a href="http://www.one-lab.org/">European Commission
  * One-Lab Project 034819</a>.
  *
  * Bloom filters are very sensitive to the number of elements inserted into

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
index d04c07d..5214cc5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
@@ -29,12 +29,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * <a href="http://www.mapr.com">MapR</a> implementation.
+ * <a href="http://www.mapr.com/">MapR</a> implementation.
  */
 @InterfaceAudience.Private
 public class FSMapRUtils extends FSUtils {
   private static final Log LOG = LogFactory.getLog(FSMapRUtils.class);
-  
+
   public void recoverFileLease(final FileSystem fs, final Path p,
       Configuration conf, CancelableProgressable reporter) throws IOException {
     LOG.info("Recovering file " + p.toString() +

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eab8eb2..5bcdbc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2804,8 +2804,12 @@
               <destDir>devapidocs</destDir>
               <name>Developer API</name>
               <description>The full HBase API, including private and unstable APIs</description>
-              <sourceFilesExclude>**/generated/*</sourceFilesExclude>
-              <excludePackageNames>org.apache.hadoop.hbase.generated.master:org.apache.hadoop.hbase.protobuf.generated:org.apache.hadoop.hbase.tmpl.common</excludePackageNames>
+              <sourceFileExcludes>
+                <exclude>**/generated/*</exclude>
+                <exclude>**/protobuf/*</exclude>
+                <exclude>**/*.scala</exclude>
+              </sourceFileExcludes>
+              <excludePackageNames>*.generated.master:*.generated:org.apache.hadoop.hbase.tmpl.common:com.google.protobuf:org.apache.hadoop.hbase.spark</excludePackageNames>
               <quiet>true</quiet>
               <linksource>true</linksource>
               <sourcetab>2</sourcetab>
@@ -2842,8 +2846,12 @@
               <destDir>testdevapidocs</destDir>
               <name>Developer API</name>
               <description>The full HBase API, including private and unstable APIs</description>
-              <sourceFilesExclude>**/generated/*</sourceFilesExclude>
-              <excludePackageNames>org.apache.hadoop.hbase.generated.master:org.apache.hadoop.hbase.protobuf.generated:org.apache.hadoop.hbase.tmpl.common</excludePackageNames>
+              <sourceFileExcludes>
+                <exclude>**/generated/*</exclude>
+                <exclude>**/protobuf/*</exclude>
+                <exclude>**/*.scala</exclude>
+              </sourceFileExcludes>
+              <excludePackageNames>*.generated.master:*.generated:org.apache.hadoop.hbase.tmpl.common:com.google.protobuf:org.apache.hadoop.hbase.spark</excludePackageNames>
               <quiet>true</quiet>
               <linksource>true</linksource>
               <sourcetab>2</sourcetab>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index af99215..9c592eb 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1252,7 +1252,8 @@ Have a look in the Web UI.
 
 == Cluster Replication
 
-NOTE: This information was previously available at link:http://hbase.apache.org/replication.html[Cluster Replication].
+NOTE: This information was previously available at
+link:http://hbase.apache.org#replication[Cluster Replication].
 
 HBase provides a cluster replication mechanism which allows you to keep one cluster's state synchronized with that of another cluster, using the write-ahead log (WAL) of the source cluster to propagate the changes.
 Some use cases for cluster replication include:

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/src/main/asciidoc/_chapters/performance.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/performance.adoc b/src/main/asciidoc/_chapters/performance.adoc
index 78edccb..3275fa3 100644
--- a/src/main/asciidoc/_chapters/performance.adoc
+++ b/src/main/asciidoc/_chapters/performance.adoc
@@ -676,7 +676,7 @@ Enabling Bloom Filters can save your having to go to disk and can help improve r
 link:http://en.wikipedia.org/wiki/Bloom_filter[Bloom filters] were developed over in link:https://issues.apache.org/jira/browse/HBASE-1200[HBase-1200 Add bloomfilters].
 For description of the development process -- why static blooms rather than dynamic -- and for an overview of the unique properties that pertain to blooms in HBase, as well as possible future directions, see the _Development Process_ section of the document link:https://issues.apache.org/jira/secure/attachment/12444007/Bloom_Filters_in_HBase.pdf[BloomFilters in HBase] attached to link:https://issues.apache.org/jira/browse/HBASE-1200[HBASE-1200].
 The bloom filters described here are actually version two of blooms in HBase.
-In versions up to 0.19.x, HBase had a dynamic bloom option based on work done by the link:http://www.one-lab.org[European Commission One-Lab Project 034819].
+In versions up to 0.19.x, HBase had a dynamic bloom option based on work done by the link:http://www.one-lab.org/[European Commission One-Lab Project 034819].
 The core of the HBase bloom work was later pulled up into Hadoop to implement org.apache.hadoop.io.BloomMapFile.
 Version 1 of HBase blooms never worked that well.
 Version 2 is a rewrite from scratch though again it starts with the one-lab work.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/src/main/site/site.xml
----------------------------------------------------------------------
diff --git a/src/main/site/site.xml b/src/main/site/site.xml
index a86bd18..62caf79 100644
--- a/src/main/site/site.xml
+++ b/src/main/site/site.xml
@@ -58,7 +58,7 @@
       <item name="Blog" href="http://blogs.apache.org/hbase/"/>
       <item name="Mailing Lists" href="mail-lists.html"/>
       <item name="Team" href="team-list.html"/>
-      <item name="ReviewBoard" href="https://reviews.apache.org"/>
+      <item name="ReviewBoard" href="https://reviews.apache.org/"/>
       <item name="Thanks" href="sponsors.html"/>
       <item name="Powered by HBase" href="poweredbyhbase.html"/>
       <item name="Other resources" href="resources.html"/>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e24d03b1/src/main/site/xdoc/index.xml
----------------------------------------------------------------------
diff --git a/src/main/site/xdoc/index.xml b/src/main/site/xdoc/index.xml
index 9b2d4b9..a4e1ee6 100644
--- a/src/main/site/xdoc/index.xml
+++ b/src/main/site/xdoc/index.xml
@@ -27,7 +27,7 @@ under the License.
 
   <body>
     <section name="Welcome to Apache HBase&#8482;">
-        <p><a href="http://www.apache.org/">Apache</a> HBase&#8482; is the <a href="http://hadoop.apache.org">Hadoop</a> database, a distributed, scalable, big data store.
+        <p><a href="http://www.apache.org/">Apache</a> HBase&#8482; is the <a href="http://hadoop.apache.org/">Hadoop</a> database, a distributed, scalable, big data store.
     </p>
     <h4>Download Apache HBase&#8482;</h4>
     <p>
@@ -68,7 +68,7 @@ Apache HBase is an open-source, distributed, versioned, non-relational database
 </ul>
 </p>
      <h4>Where Can I Get More Information?</h4>
-   <p>See the <a href="http://hbase.apache.org/book/architecture.html#arch.overview">Architecture Overview</a>, the <a href="http://hbase.apache.org/book/faq.html">Apache HBase Reference Guide FAQ</a>,
+   <p>See the <a href="http://hbase.apache.org/book#arch.overview">Architecture Overview</a>, the <a href="http://hbase.apache.org/book#faq">Apache HBase Reference Guide FAQ</a>,
     and the other documentation links on the left!
    </p>
      <h4>Export Control</h4>
@@ -85,12 +85,12 @@ Apache HBase is an open-source, distributed, versioned, non-relational database
        <p>January 15th, 2015 <a href="http://www.meetup.com/hbaseusergroup/events/218744798/">HBase meetup @ AppDynamics</a> in San Francisco</p>
        <p>November 20th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/205219992/">HBase meetup @ WANdisco</a> in San Ramon</p>
        <p>October 27th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/207386102/">HBase Meetup @ Apple</a> in Cupertino</p>
-       <p>October 15th, 2014 <a href="http://www.meetup.com/HBase-NYC/events/207655552">HBase Meetup @ Google</a> on the night before Strata/HW in NYC</p>
+       <p>October 15th, 2014 <a href="http://www.meetup.com/HBase-NYC/events/207655552/">HBase Meetup @ Google</a> on the night before Strata/HW in NYC</p>
        <p>September 25th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/203173692/">HBase Meetup @ Continuuity</a> in Palo Alto</p>
          <p>August 28th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/197773762/">HBase Meetup @ Sift Science</a> in San Francisco</p>
          <p>July 17th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/190994082/">HBase Meetup @ HP</a> in Sunnyvale</p>
          <p>June 5th, 2014 <a href="http://www.meetup.com/Hadoop-Summit-Community-San-Jose/events/179081342/">HBase BOF at Hadoop Summit</a>, San Jose Convention Center</p>
-         <p>May 5th, 2014 <a href="http://www.hbasecon.com">HBaseCon2014</a> at the Hilton San Francisco on Union Square</p>
+         <p>May 5th, 2014 <a href="http://www.hbasecon.com/">HBaseCon2014</a> at the Hilton San Francisco on Union Square</p>
          <p>March 12th, 2014 <a href="http://www.meetup.com/hbaseusergroup/events/160757912/">HBase Meetup @ Ancestry.com</a> in San Francisco</p>
       <p><small><a href="old_news.html">Old News</a></small></p>
     </section>


[8/8] hbase git commit: HBASE-14701 Fix flakey Failed tests: TestMobFlushSnapshotFromClient>TestFlushSnapshotFromClient.testSkipFlushTableSnapshot:199 null; DISABLING

Posted by sy...@apache.org.
HBASE-14701 Fix flakey Failed tests: TestMobFlushSnapshotFromClient>TestFlushSnapshotFromClient.testSkipFlushTableSnapshot:199 null; DISABLING


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4b018d2a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4b018d2a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4b018d2a

Branch: refs/heads/hbase-12439
Commit: 4b018d2a3988a70d98e2388d0013e63857c5e193
Parents: 51538c5
Author: stack <st...@apache.org>
Authored: Wed Oct 28 10:56:37 2015 -0700
Committer: stack <st...@apache.org>
Committed: Wed Oct 28 10:56:37 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4b018d2a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
index fca9f16..b30bcc9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
@@ -54,6 +54,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 
 /**
@@ -165,7 +166,7 @@ public class TestFlushSnapshotFromClient {
    * Test snapshotting a table that is online without flushing
    * @throws Exception
    */
-  @Test(timeout=30000)
+  @Ignore ("Flakey test") @Test(timeout=30000)
   public void testSkipFlushTableSnapshot() throws Exception {
     Admin admin = UTIL.getHBaseAdmin();
     // make sure we don't fail on listing snapshots


[3/8] hbase git commit: HBASE-14425 In Secure Zookeeper cluster superuser will not have sufficient permission if multiple values are configured in hbase.superuser (Pankaj Kumar)

Posted by sy...@apache.org.
HBASE-14425 In Secure Zookeeper cluster superuser will not have sufficient permission if multiple values are configured in hbase.superuser (Pankaj Kumar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0e6dd325
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0e6dd325
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0e6dd325

Branch: refs/heads/hbase-12439
Commit: 0e6dd3257b1bebe3e12c84aace59dd9cf0dcac2b
Parents: 007e4df
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 27 16:56:20 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 27 16:56:20 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   | 20 +++++++--
 .../hbase/zookeeper/ZooKeeperWatcher.java       | 45 +++++++++++++++++++-
 .../hadoop/hbase/zookeeper/TestZKUtil.java      | 24 ++++++++++-
 .../test/IntegrationTestZKAndFSPermissions.java |  3 +-
 4 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0e6dd325/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 27c3bba..633525f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -1028,11 +1030,23 @@ public class ZKUtil {
       return Ids.OPEN_ACL_UNSAFE;
     }
     if (isSecureZooKeeper) {
-      String superUser = zkw.getConfiguration().get("hbase.superuser");
       ArrayList<ACL> acls = new ArrayList<ACL>();
       // add permission to hbase supper user
-      if (superUser != null) {
-        acls.add(new ACL(Perms.ALL, new Id("auth", superUser)));
+      String[] superUsers = zkw.getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
+      if (superUsers != null) {
+        List<String> groups = new ArrayList<String>();
+        for (String user : superUsers) {
+          if (user.startsWith(AuthUtil.GROUP_PREFIX)) {
+            // TODO: Set node ACL for groups when ZK supports this feature
+            groups.add(user);
+          } else {
+            acls.add(new ACL(Perms.ALL, new Id("auth", user)));
+          }
+        }
+        if (!groups.isEmpty()) {
+          LOG.warn("Znode ACL setting for group " + groups
+              + " is skipped, Zookeeper doesn't support this feature presently.");
+        }
       }
       // Certain znodes are accessed directly by the client,
       // so they must be readable by non-authenticated clients

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e6dd325/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 220a679..f7a2175 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -31,10 +31,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -257,7 +259,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
    * @throws IOException
    */
   private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
-    String superUser = conf.get("hbase.superuser");
+    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
+    // Check whether ACL set for all superusers
+    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
+      return false;
+    }
 
     // this assumes that current authenticated user is the same as zookeeper client user
     // configured via JAAS
@@ -276,7 +282,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
         if (perms != Perms.READ) {
           return false;
         }
-      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
         if (perms != Perms.ALL) {
           return false;
         }
@@ -290,6 +296,41 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
     }
     return true;
   }
+  
+  /*
+   * Validate whether ACL set for all superusers.
+   */
+  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
+    for (String user : superUsers) {
+      boolean hasAccess = false;
+      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
+      if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
+        for (ACL acl : acls) {
+          if (user.equals(acl.getId().getId()) && acl.getPerms() == Perms.ALL) {
+            hasAccess = true;
+            break;
+          }
+        }
+        if (!hasAccess) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+  
+  /*
+   * Validate whether ACL ID is superuser.
+   */
+  public static boolean isSuperUserId(String[] superUsers, Id id) {
+    for (String user : superUsers) {
+      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
+      if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
+        return true;
+      }
+    }
+    return false;
+  }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e6dd325/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 7224507..72de935 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -18,11 +18,18 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -43,4 +50,19 @@ public class TestZKUtil {
     Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
     Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
   }
+  
+  @Test
+  public void testCreateACL() throws ZooKeeperConnectionException, IOException {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");
+    String node = "/hbase/testCreateACL";
+    ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, node, null, false);
+    List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
+    Assert.assertEquals(aclList.size(), 4); // 3+1, since ACL will be set for the creator by default
+    Assert.assertTrue(!aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group1")))
+        && !aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group2"))));
+    Assert.assertTrue(aclList.contains(new ACL(Perms.ALL, new Id("auth", "user1")))
+        && aclList.contains(new ACL(Perms.ALL, new Id("auth", "user2")))
+        && aclList.contains(new ACL(Perms.ALL, new Id("auth", "user3"))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e6dd325/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
index c39056d..3845846 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
@@ -178,6 +178,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
       boolean expectedWorldReadable) throws KeeperException, InterruptedException {
     Stat stat = new Stat();
     List<ACL> acls = zk.getZooKeeper().getACL(znode, stat);
+    String[] superUsers = superUser == null ? null : superUser.split(",");
 
     LOG.info("Checking ACLs for znode znode:" + znode + " acls:" + acls);
 
@@ -191,7 +192,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
         assertTrue(expectedWorldReadable);
         // assert that anyone can only read
         assertEquals(perms, Perms.READ);
-      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+      } else if (superUsers != null && ZooKeeperWatcher.isSuperUserId(superUsers, id)) {
         // assert that super user has all the permissions
         assertEquals(perms, Perms.ALL);
       } else if (new Id("sasl", masterPrincipal).equals(id)) {