You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/25 02:01:26 UTC

svn commit: r1377155 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/coprocessor/example/ test/java/org/apache/hadoop/hbase/coprocessor/example/

Author: larsh
Date: Sat Aug 25 00:01:26 2012
New Revision: 1377155

URL: http://svn.apache.org/viewvc?rev=1377155&view=rev
Log:
HBASE-6496 Example ZK based scan policy

Added:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java?rev=1377155&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java Sat Aug 25 00:01:26 2012
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is an example showing how a RegionObserver could configured
+ * via ZooKeeper in order to control a Region compaction, flush, and scan policy.
+ *
+ * This also demonstrated the use of shared {@link RegionObserver} state.
+ * See {@link RegionCoprocessorEnvironment#getSharedData()}.
+ *
+ * This would be useful for an incremental backup tool, which would indicate the last
+ * time of a successful backup via ZK and instruct HBase to not delete data that was
+ * inserted since (based on wall clock time). 
+ *
+ * This implements org.apache.zookeeper.Watcher directly instead of using
+ * {@link ZooKeeperWatcher}, because RegionObservers come and go and currently
+ * listeners registered with ZooKeeperWatcher cannot be removed.
+ */
+public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
+  public static String node = "/backup/example/lastbackup";
+  public static String zkkey = "ZK";
+  private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class);
+
+  /**
+   * Internal watcher that keep "data" up to date asynchronously.
+   */
+  private static class ZKWatcher implements Watcher {
+    private byte[] data = null;
+    private ZooKeeper zk;
+    private volatile boolean needSetup = true;
+    private volatile long lastSetupTry = 0;
+
+    public ZKWatcher(ZooKeeper zk) {
+      this.zk = zk;
+      // trigger the listening
+      getData();
+    }
+
+    /**
+     * Get the maintained data. In case of any ZK exceptions this will retry
+     * establishing the connection (but not more than twice/minute).
+     *
+     * getData is on the critical path, so make sure it is fast unless there is
+     * a problem (network partion, ZK ensemble down, etc)
+     * Make sure at most one (unlucky) thread retries and other threads don't pile up
+     * while that threads tries to recreate the connection.
+     *
+     * @return the last know version of the data
+     */
+    public byte[] getData() {
+      // try at most twice/minute
+      if (needSetup && EnvironmentEdgeManager.currentTimeMillis() > lastSetupTry + 30000) {
+        synchronized (this) {
+          // make sure only one thread tries to reconnect
+          if (needSetup) {
+            needSetup = false;
+          } else {
+            return data;
+          }
+        }
+        // do this without the lock held to avoid threads piling up on this lock,
+        // as it can take a while
+        try {
+          LOG.debug("Connecting to ZK");
+          // record this attempt
+          lastSetupTry = EnvironmentEdgeManager.currentTimeMillis();
+          if (zk.exists(node, false) != null) {
+            data = zk.getData(node, this, null);
+            LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data)));
+          } else {
+            zk.exists(node, this);
+          }
+        } catch (Exception x) {
+          // try again if this fails
+          needSetup = true;
+        }
+      }
+      return data;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      switch(event.getType()) {
+      case NodeDataChanged:
+      case NodeCreated:
+      try {
+        // get data and re-watch
+        data = zk.getData(node, this, null);
+        LOG.debug("Read asynchronously: "+(data == null ? "null" : Bytes.toLong(data)));
+      } catch (InterruptedException ix) {
+      } catch (KeeperException kx) {
+        needSetup = true;
+      }
+      break;
+
+      case NodeDeleted:
+      try {
+        // just re-watch
+        zk.exists(node, this);
+        data = null;
+      } catch (InterruptedException ix) {
+      } catch (KeeperException kx) {
+        needSetup = true;
+      }
+      break;
+
+      default:
+        // ignore
+      }
+    }
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
+    if (!re.getSharedData().containsKey(zkkey)) {
+      // there is a short race here
+      // in the worst case we create a watcher that will be notified once
+      re.getSharedData().putIfAbsent(
+          zkkey,
+          new ZKWatcher(re.getRegionServerServices().getZooKeeper()
+              .getRecoverableZooKeeper().getZooKeeper()));
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    // nothing to do here
+  }
+
+  protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) {
+    byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData();
+    if (data == null) {
+      return null;
+    }
+    ScanInfo oldSI = store.getScanInfo();
+    if (oldSI.getTtl() == Long.MAX_VALUE) {
+      return null;
+    }
+    long ttl =  Math.max(EnvironmentEdgeManager.currentTimeMillis() - Bytes.toLong(data), oldSI.getTtl());    
+    return new ScanInfo(store.getFamily(), ttl,
+        oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+  }
+
+  @Override
+  public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+    Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+    if (scanInfo == null) {
+      // take default action
+      return null;
+    }
+    Scan scan = new Scan();
+    scan.setMaxVersions(scanInfo.getMaxVersions());
+    return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+        ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
+        HConstants.OLDEST_TIMESTAMP);
+  }
+
+  @Override
+  public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+      InternalScanner s) throws IOException {
+    Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+    if (scanInfo == null) {
+      // take default action
+      return null;
+    }
+    Scan scan = new Scan();
+    scan.setMaxVersions(scanInfo.getMaxVersions());
+    return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
+        .getSmallestReadPoint(), earliestPutTs);
+  }
+
+  @Override
+  public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+      final KeyValueScanner s) throws IOException {
+    Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+    if (scanInfo == null) {
+      // take default action
+      return null;
+    }
+    return new StoreScanner(store, scanInfo, scan, targetCols);
+  }
+}

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java?rev=1377155&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java Sat Aug 25 00:01:26 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZooKeeperScanPolicyObserver {
+  private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] F = Bytes.toBytes("fam");
+  private static final byte[] Q = Bytes.toBytes("qual");
+  private static final byte[] R = Bytes.toBytes("row");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Test we can first start the ZK cluster by itself
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        ZooKeeperScanPolicyObserver.class.getName());
+    TEST_UTIL.startMiniZKCluster();
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScanPolicyObserver() throws Exception {
+    byte[] tableName = Bytes.toBytes("testScanPolicyObserver");
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(F)
+    .setMaxVersions(10)
+    .setTimeToLive(1);
+    desc.addFamily(hcd);
+    TEST_UTIL.getHBaseAdmin().createTable(desc);
+    HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+
+    ZooKeeperWatcher zkw = HConnectionManager.getConnection(TEST_UTIL.getConfiguration())
+        .getZooKeeperWatcher();
+    ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
+    ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node);
+    // let's say test last backup was 1h ago
+    // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data
+    zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1);
+    
+    LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000)));
+
+    long ts = now - 2000;
+    Put p = new Put(R);
+    p.add(F, Q, ts, Q);
+    t.put(p);
+    p = new Put(R);
+    p.add(F, Q, ts+1, Q);
+    t.put(p);
+
+    // these two should be expired but for the override
+    // (their ts was 2s in the past)
+    Get g = new Get(R);
+    g.setMaxVersions(10);
+    Result r = t.get(g);
+    // still there?
+    assertEquals(2, r.size());
+
+    TEST_UTIL.flush(tableName);
+    TEST_UTIL.compact(tableName, true);
+
+    g = new Get(R);
+    g.setMaxVersions(10);
+    r = t.get(g);
+    // still there?
+    assertEquals(2, r.size());
+    zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1);
+    LOG.debug("Set time: "+now);
+
+    TEST_UTIL.compact(tableName, true);
+
+    g = new Get(R);
+    g.setMaxVersions(10);
+    r = t.get(g);
+    // should be gone now
+    assertEquals(0, r.size());
+    t.close();
+  }
+}