You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/25 02:01:47 UTC
svn commit: r1377156 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/example/
test/java/org/apache/hadoop/hbase/coprocessor/example/
Author: larsh
Date: Sat Aug 25 00:01:46 2012
New Revision: 1377156
URL: http://svn.apache.org/viewvc?rev=1377156&view=rev
Log:
HBASE-6496 Example ZK based scan policy
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java?rev=1377156&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java Sat Aug 25 00:01:46 2012
@@ -0,0 +1,232 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is an example showing how a RegionObserver could configured
+ * via ZooKeeper in order to control a Region compaction, flush, and scan policy.
+ *
+ * This also demonstrated the use of shared {@link RegionObserver} state.
+ * See {@link RegionCoprocessorEnvironment#getSharedData()}.
+ *
+ * This would be useful for an incremental backup tool, which would indicate the last
+ * time of a successful backup via ZK and instruct HBase to not delete data that was
+ * inserted since (based on wall clock time).
+ *
+ * This implements org.apache.zookeeper.Watcher directly instead of using
+ * {@link ZooKeeperWatcher}, because RegionObservers come and go and currently
+ * listeners registered with ZooKeeperWatcher cannot be removed.
+ */
+public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
+ public static String node = "/backup/example/lastbackup";
+ public static String zkkey = "ZK";
+ private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class);
+
+ /**
+ * Internal watcher that keep "data" up to date asynchronously.
+ */
+ private static class ZKWatcher implements Watcher {
+ private byte[] data = null;
+ private ZooKeeper zk;
+ private volatile boolean needSetup = true;
+ private volatile long lastSetupTry = 0;
+
+ public ZKWatcher(ZooKeeper zk) {
+ this.zk = zk;
+ // trigger the listening
+ getData();
+ }
+
+ /**
+ * Get the maintained data. In case of any ZK exceptions this will retry
+ * establishing the connection (but not more than twice/minute).
+ *
+ * getData is on the critical path, so make sure it is fast unless there is
+ * a problem (network partion, ZK ensemble down, etc)
+ * Make sure at most one (unlucky) thread retries and other threads don't pile up
+ * while that threads tries to recreate the connection.
+ *
+ * @return the last know version of the data
+ */
+ public byte[] getData() {
+ // try at most twice/minute
+ if (needSetup && EnvironmentEdgeManager.currentTimeMillis() > lastSetupTry + 30000) {
+ synchronized (this) {
+ // make sure only one thread tries to reconnect
+ if (needSetup) {
+ needSetup = false;
+ } else {
+ return data;
+ }
+ }
+ // do this without the lock held to avoid threads piling up on this lock,
+ // as it can take a while
+ try {
+ LOG.debug("Connecting to ZK");
+ // record this attempt
+ lastSetupTry = EnvironmentEdgeManager.currentTimeMillis();
+ if (zk.exists(node, false) != null) {
+ data = zk.getData(node, this, null);
+ LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data)));
+ } else {
+ zk.exists(node, this);
+ }
+ } catch (Exception x) {
+ // try again if this fails
+ needSetup = true;
+ }
+ }
+ return data;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ switch(event.getType()) {
+ case NodeDataChanged:
+ case NodeCreated:
+ try {
+ // get data and re-watch
+ data = zk.getData(node, this, null);
+ LOG.debug("Read asynchronously: "+(data == null ? "null" : Bytes.toLong(data)));
+ } catch (InterruptedException ix) {
+ } catch (KeeperException kx) {
+ needSetup = true;
+ }
+ break;
+
+ case NodeDeleted:
+ try {
+ // just re-watch
+ zk.exists(node, this);
+ data = null;
+ } catch (InterruptedException ix) {
+ } catch (KeeperException kx) {
+ needSetup = true;
+ }
+ break;
+
+ default:
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
+ if (!re.getSharedData().containsKey(zkkey)) {
+ // there is a short race here
+ // in the worst case we create a watcher that will be notified once
+ re.getSharedData().putIfAbsent(
+ zkkey,
+ new ZKWatcher(re.getRegionServerServices().getZooKeeper()
+ .getRecoverableZooKeeper().getZooKeeper()));
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ // nothing to do here
+ }
+
+ protected ScanInfo getScanInfo(HStore store, RegionCoprocessorEnvironment e) {
+ byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData();
+ if (data == null) {
+ return null;
+ }
+ ScanInfo oldSI = store.getScanInfo();
+ if (oldSI.getTtl() == Long.MAX_VALUE) {
+ return null;
+ }
+ long ttl = Math.max(EnvironmentEdgeManager.currentTimeMillis() - Bytes.toLong(data), oldSI.getTtl());
+ return new ScanInfo(store.getFamily(), ttl,
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ }
+
+ @Override
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+ if (scanInfo == null) {
+ // take default action
+ return null;
+ }
+ Scan scan = new Scan();
+ scan.setMaxVersions(scanInfo.getMaxVersions());
+ return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
+ HConstants.OLDEST_TIMESTAMP);
+ }
+
+ @Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+ InternalScanner s) throws IOException {
+ HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+ if (scanInfo == null) {
+ // take default action
+ return null;
+ }
+ Scan scan = new Scan();
+ scan.setMaxVersions(scanInfo.getMaxVersions());
+ return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
+ .getSmallestReadPoint(), earliestPutTs);
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final HStore store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
+ if (scanInfo == null) {
+ // take default action
+ return null;
+ }
+ return new StoreScanner(store, scanInfo, scan, targetCols);
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java?rev=1377156&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java Sat Aug 25 00:01:46 2012
@@ -0,0 +1,124 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZooKeeperScanPolicyObserver {
+ private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] F = Bytes.toBytes("fam");
+ private static final byte[] Q = Bytes.toBytes("qual");
+ private static final byte[] R = Bytes.toBytes("row");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Test we can first start the ZK cluster by itself
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ ZooKeeperScanPolicyObserver.class.getName());
+ TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testScanPolicyObserver() throws Exception {
+ byte[] tableName = Bytes.toBytes("testScanPolicyObserver");
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(F)
+ .setMaxVersions(10)
+ .setTimeToLive(1);
+ desc.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+ HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+
+ ZooKeeperWatcher zkw = HConnectionManager.getConnection(TEST_UTIL.getConfiguration())
+ .getZooKeeperWatcher();
+ ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
+ ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node);
+ // let's say test last backup was 1h ago
+ // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data
+ zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1);
+
+ LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000)));
+
+ long ts = now - 2000;
+ Put p = new Put(R);
+ p.add(F, Q, ts, Q);
+ t.put(p);
+ p = new Put(R);
+ p.add(F, Q, ts+1, Q);
+ t.put(p);
+
+ // these two should be expired but for the override
+ // (their ts was 2s in the past)
+ Get g = new Get(R);
+ g.setMaxVersions(10);
+ Result r = t.get(g);
+ // still there?
+ assertEquals(2, r.size());
+
+ TEST_UTIL.flush(tableName);
+ TEST_UTIL.compact(tableName, true);
+
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ // still there?
+ assertEquals(2, r.size());
+ zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1);
+ LOG.debug("Set time: "+now);
+
+ TEST_UTIL.compact(tableName, true);
+
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ // should be gone now
+ assertEquals(0, r.size());
+ t.close();
+ }
+}