You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 09:51:38 UTC

[03/24] hbase git commit: HBASE-13202 Procedure v2 - core framework

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
new file mode 100644
index 0000000..0669549
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureStoreTracker {
+  private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class);
+
+  static class TestProcedure extends Procedure<Void> {
+    public TestProcedure(long procId) {
+      setProcId(procId);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { /* no-op */ }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) { /* no-op */ }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) { /* no-op */ }
+  }
+
+  @Test
+  public void testSeqInsertAndDelete() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    assertTrue(tracker.isEmpty());
+
+    final int MIN_PROC = 1;
+    final int MAX_PROC = 1 << 10;
+
+    // sequential insert
+    for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+      tracker.insert(i);
+
+      // All the proc that we inserted should not be deleted
+      for (int j = MIN_PROC; j <= i; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j));
+      }
+      // All the proc that are not yet inserted should be result as deleted
+      for (int j = i + 1; j < MAX_PROC; ++j) {
+        assertTrue(tracker.isDeleted(j) != ProcedureStoreTracker.DeleteState.NO);
+      }
+    }
+
+    // sequential delete
+    for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+      tracker.delete(i);
+
+      // All the proc that we deleted should be deleted
+      for (int j = MIN_PROC; j <= i; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(j));
+      }
+      // All the proc that are not yet deleted should be result as not deleted
+      for (int j = i + 1; j < MAX_PROC; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j));
+      }
+    }
+    assertTrue(tracker.isEmpty());
+  }
+
+  @Test
+  public void testPartialTracker() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    tracker.setPartialFlag(true);
+
+    // nothing in the tracker, the state is unknown
+    assertTrue(tracker.isEmpty());
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579));
+
+    // Mark 1 as deleted, now that is a known state
+    tracker.setDeleted(1, true);
+    tracker.dump();
+    assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579));
+
+    // Mark 579 as non-deleted, now that is a known state
+    tracker.setDeleted(579, false);
+    assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2));
+    assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(577));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(580));
+  }
+
+  @Test
+  public void testBasicCRUD() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    assertTrue(tracker.isEmpty());
+
+    Procedure[] procs = new TestProcedure[] {
+      new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
+      new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
+    };
+
+    tracker.insert(procs[0], null);
+    tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] });
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    tracker.resetUpdates();
+    assertFalse(tracker.isUpdated());
+
+    for (int i = 0; i < 4; ++i) {
+      tracker.update(procs[i]);
+      assertFalse(tracker.isEmpty());
+      assertFalse(tracker.isUpdated());
+    }
+
+    tracker.update(procs[4]);
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    tracker.update(procs[5]);
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    for (int i = 0; i < 5; ++i) {
+      tracker.delete(procs[i].getProcId());
+      assertFalse(tracker.isEmpty());
+      assertTrue(tracker.isUpdated());
+    }
+    tracker.delete(procs[5].getProcId());
+    assertTrue(tracker.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
new file mode 100644
index 0000000..344b28b
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestWALProcedureStore {
+  private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
+
+  private static final int PROCEDURE_STORE_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private WALProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore.start(PROCEDURE_STORE_SLOTS);
+    procStore.recoverLease();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private Iterator<Procedure> storeRestart() throws Exception {
+    procStore.stop(false);
+    procStore.start(PROCEDURE_STORE_SLOTS);
+    procStore.recoverLease();
+    return procStore.load();
+  }
+
+  @Test
+  public void testEmptyLogLoad() throws Exception {
+    Iterator<Procedure> loader = storeRestart();
+    assertEquals(0, countProcedures(loader));
+  }
+
+  @Test
+  public void testLoad() throws Exception {
+    Set<Long> procIds = new HashSet<>();
+
+    // Insert something in the log
+    Procedure proc1 = new TestSequentialProcedure();
+    procIds.add(proc1.getProcId());
+    procStore.insert(proc1, null);
+
+    Procedure proc2 = new TestSequentialProcedure();
+    Procedure[] child2 = new Procedure[2];
+    child2[0] = new TestSequentialProcedure();
+    child2[1] = new TestSequentialProcedure();
+
+    procIds.add(proc2.getProcId());
+    procIds.add(child2[0].getProcId());
+    procIds.add(child2[1].getProcId());
+    procStore.insert(proc2, child2);
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+
+    // Update and delete something
+    procStore.update(proc1);
+    procStore.update(child2[1]);
+    procStore.delete(child2[1].getProcId());
+    procIds.remove(child2[1].getProcId());
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+
+    // Remove 4 byte from the trailers
+    procStore.stop(false);
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(3, logs.length);
+    for (int i = 0; i < logs.length; ++i) {
+      corruptLog(logs[i], 4);
+    }
+    verifyProcIdsOnRestart(procIds);
+  }
+
+  @Test
+  public void testCorruptedTrailer() throws Exception {
+    // Insert something
+    for (int i = 0; i < 100; ++i) {
+      procStore.insert(new TestSequentialProcedure(), null);
+    }
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove 4 byte from the trailer
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(1, logs.length);
+    corruptLog(logs[0], 4);
+
+    int count = countProcedures(storeRestart());
+    assertEquals(100, count);
+  }
+
+  @Test
+  public void testCorruptedEntries() throws Exception {
+    // Insert something
+    for (int i = 0; i < 100; ++i) {
+      procStore.insert(new TestSequentialProcedure(), null);
+    }
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove some byte from the log
+    // (enough to cut the trailer and corrupt some entries)
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(1, logs.length);
+    corruptLog(logs[0], 1823);
+
+    int count = countProcedures(storeRestart());
+    assertTrue(procStore.getCorruptedLogs() != null);
+    assertEquals(1, procStore.getCorruptedLogs().size());
+    assertEquals(85, count);
+  }
+
+  private void corruptLog(final FileStatus logFile, final long dropBytes)
+      throws IOException {
+    assertTrue(logFile.getLen() > dropBytes);
+    LOG.debug("corrupt log " + logFile.getPath() +
+              " size=" + logFile.getLen() + " drop=" + dropBytes);
+    Path tmpPath = new Path(testDir, "corrupted.log");
+    InputStream in = fs.open(logFile.getPath());
+    OutputStream out =  fs.create(tmpPath);
+    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
+    fs.rename(tmpPath, logFile.getPath());
+  }
+
+  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
+    int count = 0;
+    Iterator<Procedure> loader = storeRestart();
+    while (loader.hasNext()) {
+      Procedure proc = loader.next();
+      LOG.debug("loading procId=" + proc.getProcId());
+      assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
+      count++;
+    }
+    assertEquals(procIds.size(), count);
+  }
+
+  private void assertIsEmpty(Iterator<Procedure> iterator) {
+    assertEquals(0, countProcedures(iterator));
+  }
+
+  private int countProcedures(Iterator<Procedure> iterator) {
+    int count = 0;
+    while (iterator.hasNext()) {
+      Procedure proc = iterator.next();
+      LOG.trace("loading procId=" + proc.getProcId());
+      count++;
+    }
+    return count;
+  }
+
+  private void assertEmptyLogDir() {
+    try {
+      FileStatus[] status = fs.listStatus(logDir);
+      assertTrue("expected empty state-log dir", status == null || status.length == 0);
+    } catch (FileNotFoundException e) {
+      fail("expected the state-log dir to be present: " + logDir);
+    } catch (IOException e) {
+      fail("got en exception on state-log dir list: " + e.getMessage());
+    }
+  }
+
+  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
+    private static long seqid = 0;
+
+    public TestSequentialProcedure() {
+      setProcId(++seqid);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+      long procId = getProcId();
+      if (procId % 2 == 0) {
+        stream.write(Bytes.toBytes(procId));
+      }
+    }
+
+    @Override
+    protected void deserializeStateData(InputStream stream) throws IOException {
+      long procId = getProcId();
+      if (procId % 2 == 0) {
+        byte[] bProcId = new byte[8];
+        assertEquals(8, stream.read(bProcId));
+        assertEquals(procId, Bytes.toLong(bProcId));
+      } else {
+        assertEquals(0, stream.available());
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
new file mode 100644
index 0000000..aff536a
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestTimeoutBlockingQueue {
+  private static final Log LOG = LogFactory.getLog(TestTimeoutBlockingQueue.class);
+
+  static class TestObject {
+    private long timeout;
+    private int seqId;
+
+    public TestObject(int seqId, long timeout) {
+      this.timeout = timeout;
+      this.seqId = seqId;
+    }
+
+    public long getTimeout() {
+      return timeout;
+    }
+
+    public String toString() {
+      return String.format("(%03d, %03d)", seqId, timeout);
+    }
+  }
+
+  static class TestObjectTimeoutRetriever implements TimeoutRetriever<TestObject> {
+    @Override
+    public long getTimeout(TestObject obj) {
+      return obj.getTimeout();
+    }
+
+    @Override
+    public TimeUnit getTimeUnit(TestObject obj) {
+      return TimeUnit.MILLISECONDS;
+    }
+  }
+
+  @Test
+  public void testOrder() {
+    TimeoutBlockingQueue<TestObject> queue =
+      new TimeoutBlockingQueue<TestObject>(8, new TestObjectTimeoutRetriever());
+
+    long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500};
+
+    for (int i = 0; i < timeouts.length; ++i) {
+      for (int j = 0; j <= i; ++j) {
+        queue.add(new TestObject(j, timeouts[j]));
+        queue.dump();
+      }
+
+      long prev = 0;
+      for (int j = 0; j <= i; ++j) {
+        TestObject obj = queue.poll();
+        assertTrue(obj.getTimeout() >= prev);
+        prev = obj.getTimeout();
+        queue.dump();
+      }
+    }
+  }
+
+  @Test
+  public void testTimeoutBlockingQueue() {
+    TimeoutBlockingQueue<TestObject> queue;
+
+    int[][] testArray = new int[][] {
+      {200, 400, 600},  // append
+      {200, 400, 100},  // prepend
+      {200, 400, 300},  // insert
+    };
+
+    for (int i = 0; i < testArray.length; ++i) {
+      int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length);
+      Arrays.sort(sortedArray);
+
+      // test with head == 0
+      queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
+      for (int j = 0; j < testArray[i].length; ++j) {
+        queue.add(new TestObject(j, testArray[i][j]));
+        queue.dump();
+      }
+
+      for (int j = 0; !queue.isEmpty(); ++j) {
+        assertEquals(sortedArray[j], queue.poll().getTimeout());
+      }
+
+      queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
+      queue.add(new TestObject(0, 50));
+      assertEquals(50, queue.poll().getTimeout());
+
+      // test with head > 0
+      for (int j = 0; j < testArray[i].length; ++j) {
+        queue.add(new TestObject(j, testArray[i][j]));
+        queue.dump();
+      }
+
+      for (int j = 0; !queue.isEmpty(); ++j) {
+        assertEquals(sortedArray[j], queue.poll().getTimeout());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01c36ab/hbase-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-protocol/pom.xml b/hbase-protocol/pom.xml
index 7787c52..0d33332 100644
--- a/hbase-protocol/pom.xml
+++ b/hbase-protocol/pom.xml
@@ -176,6 +176,7 @@
                           <include>MapReduce.proto</include>
                           <include>Master.proto</include>
                           <include>MultiRowMutation.proto</include>
+                          <include>Procedure.proto</include>
                           <include>Quota.proto</include>
                           <include>RegionServerStatus.proto</include>
                           <include>RowProcessor.proto</include>