You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 09:51:38 UTC
[03/24] hbase git commit: HBASE-13202 Procedure v2 - core framework
http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
new file mode 100644
index 0000000..0669549
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureStoreTracker {
+ private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class);
+
+ static class TestProcedure extends Procedure<Void> {
+ public TestProcedure(long procId) {
+ setProcId(procId);
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) { return null; }
+
+ @Override
+ protected void rollback(Void env) { /* no-op */ }
+
+ @Override
+ protected boolean abort(Void env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) { /* no-op */ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) { /* no-op */ }
+ }
+
+ @Test
+ public void testSeqInsertAndDelete() {
+ ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+ assertTrue(tracker.isEmpty());
+
+ final int MIN_PROC = 1;
+ final int MAX_PROC = 1 << 10;
+
+ // sequential insert
+ for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+ tracker.insert(i);
+
+ // All the proc that we inserted should not be deleted
+ for (int j = MIN_PROC; j <= i; ++j) {
+ assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j));
+ }
+ // All the proc that are not yet inserted should be result as deleted
+ for (int j = i + 1; j < MAX_PROC; ++j) {
+ assertTrue(tracker.isDeleted(j) != ProcedureStoreTracker.DeleteState.NO);
+ }
+ }
+
+ // sequential delete
+ for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+ tracker.delete(i);
+
+ // All the proc that we deleted should be deleted
+ for (int j = MIN_PROC; j <= i; ++j) {
+ assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(j));
+ }
+ // All the proc that are not yet deleted should be result as not deleted
+ for (int j = i + 1; j < MAX_PROC; ++j) {
+ assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j));
+ }
+ }
+ assertTrue(tracker.isEmpty());
+ }
+
+ @Test
+ public void testPartialTracker() {
+ ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+ tracker.setPartialFlag(true);
+
+ // nothing in the tracker, the state is unknown
+ assertTrue(tracker.isEmpty());
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(1));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579));
+
+ // Mark 1 as deleted, now that is a known state
+ tracker.setDeleted(1, true);
+ tracker.dump();
+ assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579));
+
+ // Mark 579 as non-deleted, now that is a known state
+ tracker.setDeleted(579, false);
+ assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2));
+ assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(577));
+ assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(580));
+ }
+
+ @Test
+ public void testBasicCRUD() {
+ ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+ assertTrue(tracker.isEmpty());
+
+ Procedure[] procs = new TestProcedure[] {
+ new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
+ new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
+ };
+
+ tracker.insert(procs[0], null);
+ tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] });
+ assertFalse(tracker.isEmpty());
+ assertTrue(tracker.isUpdated());
+
+ tracker.resetUpdates();
+ assertFalse(tracker.isUpdated());
+
+ for (int i = 0; i < 4; ++i) {
+ tracker.update(procs[i]);
+ assertFalse(tracker.isEmpty());
+ assertFalse(tracker.isUpdated());
+ }
+
+ tracker.update(procs[4]);
+ assertFalse(tracker.isEmpty());
+ assertTrue(tracker.isUpdated());
+
+ tracker.update(procs[5]);
+ assertFalse(tracker.isEmpty());
+ assertTrue(tracker.isUpdated());
+
+ for (int i = 0; i < 5; ++i) {
+ tracker.delete(procs[i].getProcId());
+ assertFalse(tracker.isEmpty());
+ assertTrue(tracker.isUpdated());
+ }
+ tracker.delete(procs[5].getProcId());
+ assertTrue(tracker.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
new file mode 100644
index 0000000..344b28b
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestWALProcedureStore {
+ private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
+
+ private static final int PROCEDURE_STORE_SLOTS = 1;
+ private static final Procedure NULL_PROC = null;
+
+ private WALProcedureStore procStore;
+
+ private HBaseCommonTestingUtility htu;
+ private FileSystem fs;
+ private Path testDir;
+ private Path logDir;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ testDir = htu.getDataTestDir();
+ fs = testDir.getFileSystem(htu.getConfiguration());
+ assertTrue(testDir.depth() > 1);
+
+ logDir = new Path(testDir, "proc-logs");
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore.start(PROCEDURE_STORE_SLOTS);
+ procStore.recoverLease();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ procStore.stop(false);
+ fs.delete(logDir, true);
+ }
+
+ private Iterator<Procedure> storeRestart() throws Exception {
+ procStore.stop(false);
+ procStore.start(PROCEDURE_STORE_SLOTS);
+ procStore.recoverLease();
+ return procStore.load();
+ }
+
+ @Test
+ public void testEmptyLogLoad() throws Exception {
+ Iterator<Procedure> loader = storeRestart();
+ assertEquals(0, countProcedures(loader));
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+ Set<Long> procIds = new HashSet<>();
+
+ // Insert something in the log
+ Procedure proc1 = new TestSequentialProcedure();
+ procIds.add(proc1.getProcId());
+ procStore.insert(proc1, null);
+
+ Procedure proc2 = new TestSequentialProcedure();
+ Procedure[] child2 = new Procedure[2];
+ child2[0] = new TestSequentialProcedure();
+ child2[1] = new TestSequentialProcedure();
+
+ procIds.add(proc2.getProcId());
+ procIds.add(child2[0].getProcId());
+ procIds.add(child2[1].getProcId());
+ procStore.insert(proc2, child2);
+
+ // Verify that everything is there
+ verifyProcIdsOnRestart(procIds);
+
+ // Update and delete something
+ procStore.update(proc1);
+ procStore.update(child2[1]);
+ procStore.delete(child2[1].getProcId());
+ procIds.remove(child2[1].getProcId());
+
+ // Verify that everything is there
+ verifyProcIdsOnRestart(procIds);
+
+ // Remove 4 byte from the trailers
+ procStore.stop(false);
+ FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(3, logs.length);
+ for (int i = 0; i < logs.length; ++i) {
+ corruptLog(logs[i], 4);
+ }
+ verifyProcIdsOnRestart(procIds);
+ }
+
+ @Test
+ public void testCorruptedTrailer() throws Exception {
+ // Insert something
+ for (int i = 0; i < 100; ++i) {
+ procStore.insert(new TestSequentialProcedure(), null);
+ }
+
+ // Stop the store
+ procStore.stop(false);
+
+ // Remove 4 byte from the trailer
+ FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(1, logs.length);
+ corruptLog(logs[0], 4);
+
+ int count = countProcedures(storeRestart());
+ assertEquals(100, count);
+ }
+
+ @Test
+ public void testCorruptedEntries() throws Exception {
+ // Insert something
+ for (int i = 0; i < 100; ++i) {
+ procStore.insert(new TestSequentialProcedure(), null);
+ }
+
+ // Stop the store
+ procStore.stop(false);
+
+ // Remove some byte from the log
+ // (enough to cut the trailer and corrupt some entries)
+ FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(1, logs.length);
+ corruptLog(logs[0], 1823);
+
+ int count = countProcedures(storeRestart());
+ assertTrue(procStore.getCorruptedLogs() != null);
+ assertEquals(1, procStore.getCorruptedLogs().size());
+ assertEquals(85, count);
+ }
+
+ private void corruptLog(final FileStatus logFile, final long dropBytes)
+ throws IOException {
+ assertTrue(logFile.getLen() > dropBytes);
+ LOG.debug("corrupt log " + logFile.getPath() +
+ " size=" + logFile.getLen() + " drop=" + dropBytes);
+ Path tmpPath = new Path(testDir, "corrupted.log");
+ InputStream in = fs.open(logFile.getPath());
+ OutputStream out = fs.create(tmpPath);
+ IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
+ fs.rename(tmpPath, logFile.getPath());
+ }
+
+ private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
+ int count = 0;
+ Iterator<Procedure> loader = storeRestart();
+ while (loader.hasNext()) {
+ Procedure proc = loader.next();
+ LOG.debug("loading procId=" + proc.getProcId());
+ assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
+ count++;
+ }
+ assertEquals(procIds.size(), count);
+ }
+
+ private void assertIsEmpty(Iterator<Procedure> iterator) {
+ assertEquals(0, countProcedures(iterator));
+ }
+
+ private int countProcedures(Iterator<Procedure> iterator) {
+ int count = 0;
+ while (iterator.hasNext()) {
+ Procedure proc = iterator.next();
+ LOG.trace("loading procId=" + proc.getProcId());
+ count++;
+ }
+ return count;
+ }
+
+ private void assertEmptyLogDir() {
+ try {
+ FileStatus[] status = fs.listStatus(logDir);
+ assertTrue("expected empty state-log dir", status == null || status.length == 0);
+ } catch (FileNotFoundException e) {
+ fail("expected the state-log dir to be present: " + logDir);
+ } catch (IOException e) {
+ fail("got en exception on state-log dir list: " + e.getMessage());
+ }
+ }
+
+ public static class TestSequentialProcedure extends SequentialProcedure<Void> {
+ private static long seqid = 0;
+
+ public TestSequentialProcedure() {
+ setProcId(++seqid);
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) { return null; }
+
+ @Override
+ protected void rollback(Void env) { }
+
+ @Override
+ protected boolean abort(Void env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ long procId = getProcId();
+ if (procId % 2 == 0) {
+ stream.write(Bytes.toBytes(procId));
+ }
+ }
+
+ @Override
+ protected void deserializeStateData(InputStream stream) throws IOException {
+ long procId = getProcId();
+ if (procId % 2 == 0) {
+ byte[] bProcId = new byte[8];
+ assertEquals(8, stream.read(bProcId));
+ assertEquals(procId, Bytes.toLong(bProcId));
+ } else {
+ assertEquals(0, stream.available());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
new file mode 100644
index 0000000..aff536a
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestTimeoutBlockingQueue {
+ private static final Log LOG = LogFactory.getLog(TestTimeoutBlockingQueue.class);
+
+ static class TestObject {
+ private long timeout;
+ private int seqId;
+
+ public TestObject(int seqId, long timeout) {
+ this.timeout = timeout;
+ this.seqId = seqId;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public String toString() {
+ return String.format("(%03d, %03d)", seqId, timeout);
+ }
+ }
+
+ static class TestObjectTimeoutRetriever implements TimeoutRetriever<TestObject> {
+ @Override
+ public long getTimeout(TestObject obj) {
+ return obj.getTimeout();
+ }
+
+ @Override
+ public TimeUnit getTimeUnit(TestObject obj) {
+ return TimeUnit.MILLISECONDS;
+ }
+ }
+
+ @Test
+ public void testOrder() {
+ TimeoutBlockingQueue<TestObject> queue =
+ new TimeoutBlockingQueue<TestObject>(8, new TestObjectTimeoutRetriever());
+
+ long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500};
+
+ for (int i = 0; i < timeouts.length; ++i) {
+ for (int j = 0; j <= i; ++j) {
+ queue.add(new TestObject(j, timeouts[j]));
+ queue.dump();
+ }
+
+ long prev = 0;
+ for (int j = 0; j <= i; ++j) {
+ TestObject obj = queue.poll();
+ assertTrue(obj.getTimeout() >= prev);
+ prev = obj.getTimeout();
+ queue.dump();
+ }
+ }
+ }
+
+ @Test
+ public void testTimeoutBlockingQueue() {
+ TimeoutBlockingQueue<TestObject> queue;
+
+ int[][] testArray = new int[][] {
+ {200, 400, 600}, // append
+ {200, 400, 100}, // prepend
+ {200, 400, 300}, // insert
+ };
+
+ for (int i = 0; i < testArray.length; ++i) {
+ int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length);
+ Arrays.sort(sortedArray);
+
+ // test with head == 0
+ queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
+ for (int j = 0; j < testArray[i].length; ++j) {
+ queue.add(new TestObject(j, testArray[i][j]));
+ queue.dump();
+ }
+
+ for (int j = 0; !queue.isEmpty(); ++j) {
+ assertEquals(sortedArray[j], queue.poll().getTimeout());
+ }
+
+ queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
+ queue.add(new TestObject(0, 50));
+ assertEquals(50, queue.poll().getTimeout());
+
+ // test with head > 0
+ for (int j = 0; j < testArray[i].length; ++j) {
+ queue.add(new TestObject(j, testArray[i][j]));
+ queue.dump();
+ }
+
+ for (int j = 0; !queue.isEmpty(); ++j) {
+ assertEquals(sortedArray[j], queue.poll().getTimeout());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-protocol/pom.xml b/hbase-protocol/pom.xml
index 7787c52..0d33332 100644
--- a/hbase-protocol/pom.xml
+++ b/hbase-protocol/pom.xml
@@ -176,6 +176,7 @@
<include>MapReduce.proto</include>
<include>Master.proto</include>
<include>MultiRowMutation.proto</include>
+ <include>Procedure.proto</include>
<include>Quota.proto</include>
<include>RegionServerStatus.proto</include>
<include>RowProcessor.proto</include>