You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/09/08 17:35:53 UTC
[4/5] hbase git commit: HBASE-18106 Redo ProcedureInfo and LockInfo
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
new file mode 100644
index 0000000..0c12648
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.shaded.protobuf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Any;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BytesValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestProtobufUtil {
+ public TestProtobufUtil() {
+ }
+
+ @Test
+ public void testException() throws IOException {
+ NameBytesPair.Builder builder = NameBytesPair.newBuilder();
+ final String omg = "OMG!!!";
+ builder.setName("java.io.IOException");
+ builder.setValue(ByteString.copyFrom(Bytes.toBytes(omg)));
+ Throwable t = ProtobufUtil.toException(builder.build());
+ assertEquals(omg, t.getMessage());
+ builder.clear();
+ builder.setName("org.apache.hadoop.ipc.RemoteException");
+ builder.setValue(ByteString.copyFrom(Bytes.toBytes(omg)));
+ t = ProtobufUtil.toException(builder.build());
+ assertEquals(omg, t.getMessage());
+ }
+
+ /**
+ * Test basic Get conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testGet() throws IOException {
+ ClientProtos.Get.Builder getBuilder = ClientProtos.Get.newBuilder();
+ getBuilder.setRow(ByteString.copyFromUtf8("row"));
+ Column.Builder columnBuilder = Column.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ columnBuilder.addQualifier(ByteString.copyFromUtf8("c1"));
+ columnBuilder.addQualifier(ByteString.copyFromUtf8("c2"));
+ getBuilder.addColumn(columnBuilder.build());
+
+ columnBuilder.clear();
+ columnBuilder.setFamily(ByteString.copyFromUtf8("f2"));
+ getBuilder.addColumn(columnBuilder.build());
+ getBuilder.setLoadColumnFamiliesOnDemand(true);
+ ClientProtos.Get proto = getBuilder.build();
+ // default fields
+ assertEquals(1, proto.getMaxVersions());
+ assertEquals(true, proto.getCacheBlocks());
+
+ // set the default value for equal comparison
+ getBuilder = ClientProtos.Get.newBuilder(proto);
+ getBuilder.setMaxVersions(1);
+ getBuilder.setCacheBlocks(true);
+
+ Get get = ProtobufUtil.toGet(proto);
+ assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
+ }
+
+ /**
+ * Test Delete Mutate conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDelete() throws IOException {
+ MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
+ mutateBuilder.setMutateType(MutationType.DELETE);
+ mutateBuilder.setTimestamp(111111);
+ ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
+ valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
+ qualifierBuilder.setDeleteType(DeleteType.DELETE_ONE_VERSION);
+ qualifierBuilder.setTimestamp(111222);
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
+ qualifierBuilder.setDeleteType(DeleteType.DELETE_MULTIPLE_VERSIONS);
+ qualifierBuilder.setTimestamp(111333);
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ mutateBuilder.addColumnValue(valueBuilder.build());
+
+ MutationProto proto = mutateBuilder.build();
+ // default fields
+ assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
+
+ // set the default value for equal comparison
+ mutateBuilder = MutationProto.newBuilder(proto);
+ mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
+
+ Delete delete = ProtobufUtil.toDelete(proto);
+
+ // delete always have empty value,
+ // add empty value to the original mutate
+ for (ColumnValue.Builder column:
+ mutateBuilder.getColumnValueBuilderList()) {
+ for (QualifierValue.Builder qualifier:
+ column.getQualifierValueBuilderList()) {
+ qualifier.setValue(ByteString.EMPTY);
+ }
+ }
+ assertEquals(mutateBuilder.build(),
+ ProtobufUtil.toMutation(MutationType.DELETE, delete));
+ }
+
+ /**
+ * Test Put Mutate conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testPut() throws IOException {
+ MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
+ mutateBuilder.setMutateType(MutationType.PUT);
+ mutateBuilder.setTimestamp(111111);
+ ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
+ valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
+ qualifierBuilder.setValue(ByteString.copyFromUtf8("v1"));
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
+ qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
+ qualifierBuilder.setTimestamp(222222);
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ mutateBuilder.addColumnValue(valueBuilder.build());
+
+ MutationProto proto = mutateBuilder.build();
+ // default fields
+ assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
+
+ // set the default value for equal comparison
+ mutateBuilder = MutationProto.newBuilder(proto);
+ mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
+
+ Put put = ProtobufUtil.toPut(proto);
+
+ // put value always use the default timestamp if no
+ // value level timestamp specified,
+ // add the timestamp to the original mutate
+ long timestamp = put.getTimeStamp();
+ for (ColumnValue.Builder column:
+ mutateBuilder.getColumnValueBuilderList()) {
+ for (QualifierValue.Builder qualifier:
+ column.getQualifierValueBuilderList()) {
+ if (!qualifier.hasTimestamp()) {
+ qualifier.setTimestamp(timestamp);
+ }
+ }
+ }
+ assertEquals(mutateBuilder.build(),
+ ProtobufUtil.toMutation(MutationType.PUT, put));
+ }
+
+ /**
+ * Test basic Scan conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testScan() throws IOException {
+ ClientProtos.Scan.Builder scanBuilder = ClientProtos.Scan.newBuilder();
+ scanBuilder.setStartRow(ByteString.copyFromUtf8("row1"));
+ scanBuilder.setStopRow(ByteString.copyFromUtf8("row2"));
+ Column.Builder columnBuilder = Column.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ columnBuilder.addQualifier(ByteString.copyFromUtf8("c1"));
+ columnBuilder.addQualifier(ByteString.copyFromUtf8("c2"));
+ scanBuilder.addColumn(columnBuilder.build());
+
+ columnBuilder.clear();
+ columnBuilder.setFamily(ByteString.copyFromUtf8("f2"));
+ scanBuilder.addColumn(columnBuilder.build());
+
+ ClientProtos.Scan proto = scanBuilder.build();
+
+ // Verify default values
+ assertEquals(1, proto.getMaxVersions());
+ assertEquals(true, proto.getCacheBlocks());
+
+ // Verify fields survive ClientProtos.Scan -> Scan -> ClientProtos.Scan
+ // conversion
+ scanBuilder = ClientProtos.Scan.newBuilder(proto);
+ scanBuilder.setMaxVersions(2);
+ scanBuilder.setCacheBlocks(false);
+ scanBuilder.setCaching(1024);
+ ClientProtos.Scan expectedProto = scanBuilder.build();
+
+ ClientProtos.Scan actualProto = ProtobufUtil.toScan(
+ ProtobufUtil.toScan(expectedProto));
+ assertEquals(expectedProto, actualProto);
+ }
+
+ @Test
+ public void testToCell() throws Exception {
+ KeyValue kv1 =
+ new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]);
+ KeyValue kv2 =
+ new KeyValue(Bytes.toBytes("bbb"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]);
+ KeyValue kv3 =
+ new KeyValue(Bytes.toBytes("ccc"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]);
+ byte[] arr = new byte[kv1.getLength() + kv2.getLength() + kv3.getLength()];
+ System.arraycopy(kv1.getBuffer(), kv1.getOffset(), arr, 0, kv1.getLength());
+ System.arraycopy(kv2.getBuffer(), kv2.getOffset(), arr, kv1.getLength(), kv2.getLength());
+ System.arraycopy(kv3.getBuffer(), kv3.getOffset(), arr, kv1.getLength() + kv2.getLength(),
+ kv3.getLength());
+ ByteBuffer dbb = ByteBuffer.allocateDirect(arr.length);
+ dbb.put(arr);
+ ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(dbb, kv1.getLength(), kv2.getLength());
+ CellProtos.Cell cell = ProtobufUtil.toCell(offheapKV);
+ Cell newOffheapKV = ProtobufUtil.toCell(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell);
+ assertTrue(CellComparator.COMPARATOR.compare(offheapKV, newOffheapKV) == 0);
+ }
+
+ /**
+ * Test Increment Mutate conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testIncrement() throws IOException {
+ long timeStamp = 111111;
+ MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
+ mutateBuilder.setMutateType(MutationProto.MutationType.INCREMENT);
+ ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
+ valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
+ qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L)));
+ qualifierBuilder.setTimestamp(timeStamp);
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
+ qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L)));
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ mutateBuilder.addColumnValue(valueBuilder.build());
+
+ MutationProto proto = mutateBuilder.build();
+ // default fields
+ assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
+
+ // set the default value for equal comparison
+ mutateBuilder = MutationProto.newBuilder(proto);
+ mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
+
+ Increment increment = ProtobufUtil.toIncrement(proto, null);
+ mutateBuilder.setTimestamp(increment.getTimeStamp());
+ assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
+ }
+
+ /**
+ * Test Append Mutate conversions.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testAppend() throws IOException {
+ long timeStamp = 111111;
+ MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
+ mutateBuilder.setMutateType(MutationType.APPEND);
+ mutateBuilder.setTimestamp(timeStamp);
+ ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
+ valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
+ QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
+ qualifierBuilder.setValue(ByteString.copyFromUtf8("v1"));
+ qualifierBuilder.setTimestamp(timeStamp);
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
+ qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
+ valueBuilder.addQualifierValue(qualifierBuilder.build());
+ mutateBuilder.addColumnValue(valueBuilder.build());
+
+ MutationProto proto = mutateBuilder.build();
+ // default fields
+ assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
+
+ // set the default value for equal comparison
+ mutateBuilder = MutationProto.newBuilder(proto);
+ mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
+
+ Append append = ProtobufUtil.toAppend(proto, null);
+
+ // append always use the latest timestamp,
+ // reset the timestamp to the original mutate
+ mutateBuilder.setTimestamp(append.getTimeStamp());
+ assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
+ }
+
+ private static ProcedureProtos.Procedure.Builder createProcedureBuilder(long procId) {
+ ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder();
+ builder.setProcId(procId);
+ builder.setClassName("java.lang.Object");
+ builder.setSubmittedTime(0);
+ builder.setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ builder.setLastUpdate(0);
+
+ return builder;
+ }
+
+ private static ProcedureProtos.Procedure createProcedure(long procId) {
+ return createProcedureBuilder(procId).build();
+ }
+
+ private static LockServiceProtos.LockedResource createLockedResource(
+ LockServiceProtos.LockedResourceType resourceType, String resourceName,
+ LockServiceProtos.LockType lockType,
+ ProcedureProtos.Procedure exclusiveLockOwnerProcedure, int sharedLockCount) {
+ LockServiceProtos.LockedResource.Builder build = LockServiceProtos.LockedResource.newBuilder();
+ build.setResourceType(resourceType);
+ build.setResourceName(resourceName);
+ build.setLockType(lockType);
+ if (exclusiveLockOwnerProcedure != null) {
+ build.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedure);
+ }
+ build.setSharedLockCount(sharedLockCount);
+
+ return build.build();
+ }
+
+ @Test
+ public void testProcedureInfo() {
+ ProcedureProtos.Procedure.Builder builder = createProcedureBuilder(1);
+ ByteString stateBytes = ByteString.copyFrom(new byte[] { 65 });
+ BytesValue state = BytesValue.newBuilder().setValue(stateBytes).build();
+ builder.addStateMessage(Any.pack(state));
+ ProcedureProtos.Procedure procedure = builder.build();
+
+ String procJson = ProtobufUtil.toProcedureJson(Lists.newArrayList(procedure));
+ assertEquals("[{"
+ + "\"className\":\"java.lang.Object\","
+ + "\"procId\":\"1\","
+ + "\"submittedTime\":\"0\","
+ + "\"state\":\"RUNNABLE\","
+ + "\"lastUpdate\":\"0\","
+ + "\"stateMessage\":[{\"value\":\"QQ==\"}]"
+ + "}]", procJson);
+ }
+
+ @Test
+ public void testServerLockInfo() {
+ LockServiceProtos.LockedResource resource = createLockedResource(
+ LockServiceProtos.LockedResourceType.SERVER, "server",
+ LockServiceProtos.LockType.SHARED, null, 2);
+
+ String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource));
+ assertEquals("[{"
+ + "\"resourceType\":\"SERVER\","
+ + "\"resourceName\":\"server\","
+ + "\"lockType\":\"SHARED\","
+ + "\"sharedLockCount\":2"
+ + "}]", lockJson);
+ }
+
+ @Test
+ public void testNamespaceLockInfo() {
+ LockServiceProtos.LockedResource resource = createLockedResource(
+ LockServiceProtos.LockedResourceType.NAMESPACE, "ns",
+ LockServiceProtos.LockType.EXCLUSIVE, createProcedure(2), 0);
+
+ String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource));
+ assertEquals("[{"
+ + "\"resourceType\":\"NAMESPACE\","
+ + "\"resourceName\":\"ns\","
+ + "\"lockType\":\"EXCLUSIVE\","
+ + "\"exclusiveLockOwnerProcedure\":{"
+ + "\"className\":\"java.lang.Object\","
+ + "\"procId\":\"2\","
+ + "\"submittedTime\":\"0\","
+ + "\"state\":\"RUNNABLE\","
+ + "\"lastUpdate\":\"0\""
+ + "},"
+ + "\"sharedLockCount\":0"
+ + "}]", lockJson);
+ }
+
+ @Test
+ public void testTableLockInfo() {
+ LockServiceProtos.LockedResource resource = createLockedResource(
+ LockServiceProtos.LockedResourceType.TABLE, "table",
+ LockServiceProtos.LockType.SHARED, null, 2);
+
+ String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource));
+ assertEquals("[{"
+ + "\"resourceType\":\"TABLE\","
+ + "\"resourceName\":\"table\","
+ + "\"lockType\":\"SHARED\","
+ + "\"sharedLockCount\":2"
+ + "}]", lockJson);
+ }
+
+ @Test
+ public void testRegionLockInfo() {
+ LockServiceProtos.LockedResource resource = createLockedResource(
+ LockServiceProtos.LockedResourceType.REGION, "region",
+ LockServiceProtos.LockType.EXCLUSIVE, createProcedure(3), 0);
+
+ String lockJson = ProtobufUtil.toLockJson(Lists.newArrayList(resource));
+ assertEquals("[{"
+ + "\"resourceType\":\"REGION\","
+ + "\"resourceName\":\"region\","
+ + "\"lockType\":\"EXCLUSIVE\","
+ + "\"exclusiveLockOwnerProcedure\":{"
+ + "\"className\":\"java.lang.Object\","
+ + "\"procId\":\"3\","
+ + "\"submittedTime\":\"0\","
+ + "\"state\":\"RUNNABLE\","
+ + "\"lastUpdate\":\"0\""
+ + "},"
+ + "\"sharedLockCount\":0"
+ + "}]", lockJson);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
deleted file mode 100644
index 36dabdd..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.NonceKey;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Procedure information
- */
-@InterfaceAudience.Public
-public class ProcedureInfo implements Cloneable {
- private final long procId;
- private final String procName;
- private final String procOwner;
- private final ProcedureState procState;
- private final long parentId;
- private final NonceKey nonceKey;
- private final IOException exception;
- private final long lastUpdate;
- private final long submittedTime;
- private final byte[] result;
-
- private long clientAckTime = -1;
-
- @InterfaceAudience.Private
- public ProcedureInfo(
- final long procId,
- final String procName,
- final String procOwner,
- final ProcedureState procState,
- final long parentId,
- final NonceKey nonceKey,
- final IOException exception,
- final long lastUpdate,
- final long submittedTime,
- final byte[] result) {
- this.procId = procId;
- this.procName = procName;
- this.procOwner = procOwner;
- this.procState = procState;
- this.parentId = parentId;
- this.nonceKey = nonceKey;
- this.lastUpdate = lastUpdate;
- this.submittedTime = submittedTime;
-
- // If the procedure is completed, we should treat exception and result differently
- this.exception = exception;
- this.result = result;
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
- justification="Intentional; calling super class clone doesn't make sense here.")
- public ProcedureInfo clone() {
- return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
- exception, lastUpdate, submittedTime, result);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(procName);
- sb.append(" pid=");
- sb.append(procId);
- if (hasParentId()) {
- sb.append(", ppid=");
- sb.append(parentId);
- }
- if (hasOwner()) {
- sb.append(", owner=");
- sb.append(procOwner);
- }
- sb.append(", state=");
- sb.append(procState);
-
- long now = EnvironmentEdgeManager.currentTime();
- sb.append(", submittedTime=");
- sb.append(StringUtils.formatTime(now - submittedTime));
- sb.append(" ago, lastUpdate=");
- sb.append(StringUtils.formatTime(now - submittedTime));
- sb.append(" ago");
-
- if (isFailed()) {
- sb.append(", exception=\"");
- sb.append(this.exception.getMessage());
- sb.append("\"");
- }
- return sb.toString();
- }
-
- public long getProcId() {
- return procId;
- }
-
- public String getProcName() {
- return procName;
- }
-
- public boolean hasOwner() {
- return procOwner != null;
- }
-
- public String getProcOwner() {
- return procOwner;
- }
-
- public ProcedureState getProcState() {
- return procState;
- }
-
- public boolean hasParentId() {
- return (parentId != -1);
- }
-
- public long getParentId() {
- return parentId;
- }
-
- public NonceKey getNonceKey() {
- return nonceKey;
- }
-
- public boolean isFailed() {
- return exception != null;
- }
-
- public IOException getException() {
- if (isFailed()) {
- return this.exception;
- }
- return null;
- }
-
- public String getExceptionFullMessage() {
- assert isFailed();
- final IOException e = getException();
- return e.getCause() + " - " + e.getMessage();
- }
-
- public boolean hasResultData() {
- return result != null;
- }
-
- public byte[] getResult() {
- return result;
- }
-
- public long getSubmittedTime() {
- return submittedTime;
- }
-
- public long getLastUpdate() {
- return lastUpdate;
- }
-
- public long executionTime() {
- return lastUpdate - submittedTime;
- }
-
- @InterfaceAudience.Private
- public boolean hasClientAckTime() {
- return clientAckTime != -1;
- }
-
- @InterfaceAudience.Private
- public long getClientAckTime() {
- return clientAckTime;
- }
-
- @InterfaceAudience.Private
- public void setClientAckTime(final long timestamp) {
- this.clientAckTime = timestamp;
- }
-
- /**
- * Check if the user is this procedure's owner
- * @param procInfo the procedure to check
- * @param user the user
- * @return true if the user is the owner of the procedure,
- * false otherwise or the owner is unknown.
- */
- @InterfaceAudience.Private
- public static boolean isProcedureOwner(final ProcedureInfo procInfo, final User user) {
- if (user == null) {
- return false;
- }
- String procOwner = procInfo.getProcOwner();
- if (procOwner == null) {
- return false;
- }
- return procOwner.equals(user.getShortName());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java
deleted file mode 100644
index 30ecee8..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/procedure2/LockInfo.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Public
-public class LockInfo {
- @InterfaceAudience.Public
- public enum ResourceType {
- SERVER, NAMESPACE, TABLE, REGION
- }
-
- @InterfaceAudience.Public
- public enum LockType {
- EXCLUSIVE, SHARED
- }
-
- @InterfaceAudience.Public
- public static class WaitingProcedure {
- private LockType lockType;
- private ProcedureInfo procedure;
-
- public WaitingProcedure() {
- }
-
- public LockType getLockType() {
- return lockType;
- }
-
- public void setLockType(LockType lockType) {
- this.lockType = lockType;
- }
-
- public ProcedureInfo getProcedure() {
- return procedure;
- }
-
- public void setProcedure(ProcedureInfo procedure) {
- this.procedure = procedure;
- }
- }
-
- private ResourceType resourceType;
- private String resourceName;
- private LockType lockType;
- private ProcedureInfo exclusiveLockOwnerProcedure;
- private int sharedLockCount;
- private final List<WaitingProcedure> waitingProcedures;
-
- public LockInfo() {
- waitingProcedures = new ArrayList<>();
- }
-
- public ResourceType getResourceType() {
- return resourceType;
- }
-
- public void setResourceType(ResourceType resourceType) {
- this.resourceType = resourceType;
- }
-
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
- public LockType getLockType() {
- return lockType;
- }
-
- public void setLockType(LockType lockType) {
- this.lockType = lockType;
- }
-
- public ProcedureInfo getExclusiveLockOwnerProcedure() {
- return exclusiveLockOwnerProcedure;
- }
-
- public void setExclusiveLockOwnerProcedure(
- ProcedureInfo exclusiveLockOwnerProcedure) {
- this.exclusiveLockOwnerProcedure = exclusiveLockOwnerProcedure;
- }
-
- public int getSharedLockCount() {
- return sharedLockCount;
- }
-
- public void setSharedLockCount(int sharedLockCount) {
- this.sharedLockCount = sharedLockCount;
- }
-
- public List<WaitingProcedure> getWaitingProcedures() {
- return waitingProcedures;
- }
-
- public void setWaitingProcedures(List<WaitingProcedure> waitingProcedures) {
- this.waitingProcedures.clear();
- this.waitingProcedures.addAll(waitingProcedures);
- }
-
- public void addWaitingProcedure(WaitingProcedure waitingProcedure) {
- waitingProcedures.add(waitingProcedure);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java
new file mode 100644
index 0000000..8d85b9d
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/JRubyFormat.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.common.escape.Escaper;
+import org.apache.hadoop.hbase.shaded.com.google.common.escape.Escapers;
+
+/**
+ * Utility class for converting objects to JRuby.
+ *
+ * It handles null, Boolean, Number, String, byte[], List<Object>, Map<String, Object> structures.
+ *
+ * <p>
+ * E.g.
+ * <pre>
+ * Map<String, Object> map = new LinkedHashMap<>();
+ * map.put("null", null);
+ * map.put("boolean", true);
+ * map.put("number", 1);
+ * map.put("string", "str");
+ * map.put("binary", new byte[] { 1, 2, 3 });
+ * map.put("list", Lists.newArrayList(1, "2", true));
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * Calling {@link #print(Object)} method will result:
+ * <pre>
+ * { null => '', boolean => 'true', number => '1', string => 'str', binary => '010203', list => [ '1', '2', 'true' ] }
+ * </pre>
+ * </p>
+ */
+@InterfaceAudience.Private
+public final class JRubyFormat {
+ private static final Escaper escaper;
+
+ static {
+ escaper = Escapers.builder()
+ .addEscape('\\', "\\\\")
+ .addEscape('\'', "\\'")
+ .addEscape('\n', "\\n")
+ .addEscape('\r', "\\r")
+ .addEscape('\t', "\\t")
+ .addEscape('\f', "\\f")
+ .build();
+ }
+
+ private JRubyFormat() {
+ }
+
+ private static String escape(Object object) {
+ if (object == null) {
+ return "";
+ } else {
+ return escaper.escape(object.toString());
+ }
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static void appendJRuby(StringBuilder builder, Object object) {
+ if (object == null) {
+ builder.append("''");
+ } else if (object instanceof List) {
+ builder.append("[");
+
+ boolean first = true;
+
+ for (Object element: (List<Object>)object) {
+ if (first) {
+ first = false;
+ builder.append(" ");
+ } else {
+ builder.append(", ");
+ }
+
+ appendJRuby(builder, element);
+ }
+
+ if (!first) {
+ builder.append(" ");
+ }
+
+ builder.append("]");
+ } else if (object instanceof Map) {
+ builder.append("{");
+
+ boolean first = true;
+
+ for (Entry<String, Object> entry: ((Map<String, Object>)object).entrySet()) {
+ if (first) {
+ first = false;
+ builder.append(" ");
+ } else {
+ builder.append(", ");
+ }
+
+ String key = entry.getKey();
+ String escapedKey = escape(key);
+
+ if (key.equals(escapedKey)) {
+ builder.append(key);
+ } else {
+ builder.append("'").append(escapedKey).append("'");
+ }
+
+ builder.append(" => ");
+ appendJRuby(builder, entry.getValue());
+ }
+
+ if (!first) {
+ builder.append(" ");
+ }
+
+ builder.append("}");
+ } else if (object instanceof byte[]) {
+ String byteString = Bytes.toHex((byte[])object);
+ builder.append("'").append(escape(byteString)).append("'");
+ } else {
+ builder.append("'").append(escape(object)).append("'");
+ }
+ }
+
+ public static String print(Object object) {
+ StringBuilder builder = new StringBuilder();
+
+ appendJRuby(builder, object);
+
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java
new file mode 100644
index 0000000..96b3da0
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestJRubyFormat.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestJRubyFormat {
+ @Test
+ public void testPrint() {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("null", null);
+ map.put("boolean", true);
+ map.put("number", 1);
+ map.put("string", "str");
+ map.put("binary", new byte[] { 1, 2, 3 });
+ map.put("list", Lists.newArrayList(1, "2", true));
+
+ String jrubyString = JRubyFormat.print(map);
+ assertEquals("{ null => '', boolean => 'true', number => '1', "
+ + "string => 'str', binary => '010203', "
+ + "list => [ '1', '2', 'true' ] }", jrubyString);
+ }
+
+ @Test
+ public void testEscape() {
+ String jrubyString = JRubyFormat.print("\\\'\n\r\t\f");
+ assertEquals("'\\\\\\'\\n\\r\\t\\f'", jrubyString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java
new file mode 100644
index 0000000..e4d867d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum LockType {
+ EXCLUSIVE, SHARED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java
new file mode 100644
index 0000000..e3320ab
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.LockedResourceType;
+
+@InterfaceAudience.Private
+public class LockedResource {
+ private final LockedResourceType resourceType;
+ private final String resourceName;
+ private final LockType lockType;
+ private final Procedure<?> exclusiveLockOwnerProcedure;
+ private final int sharedLockCount;
+ private final List<Procedure<?>> waitingProcedures;
+
+ public LockedResource(LockedResourceType resourceType, String resourceName,
+ LockType lockType, Procedure<?> exclusiveLockOwnerProcedure,
+ int sharedLockCount, List<Procedure<?>> waitingProcedures) {
+ this.resourceType = resourceType;
+ this.resourceName = resourceName;
+ this.lockType = lockType;
+ this.exclusiveLockOwnerProcedure = exclusiveLockOwnerProcedure;
+ this.sharedLockCount = sharedLockCount;
+ this.waitingProcedures = waitingProcedures;
+ }
+
+ public LockedResourceType getResourceType() {
+ return resourceType;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public LockType getLockType() {
+ return lockType;
+ }
+
+ public Procedure<?> getExclusiveLockOwnerProcedure() {
+ return exclusiveLockOwnerProcedure;
+ }
+
+ public int getSharedLockCount() {
+ return sharedLockCount;
+ }
+
+ public List<Procedure<?>> getWaitingProcedures() {
+ return waitingProcedures;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
new file mode 100644
index 0000000..29820f1
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum LockedResourceType {
+ SERVER, NAMESPACE, TABLE, REGION
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 335e83c..db488c9 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -165,17 +163,17 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* The user-level code of the procedure may have some state to
* persist (e.g. input arguments or current position in the processing state) to
* be able to resume on failure.
- * @param stream the stream that will contain the user serialized data
+ * @param serializer stores the serializable state
*/
- protected abstract void serializeStateData(final OutputStream stream)
+ protected abstract void serializeStateData(final ProcedureStateSerializer serializer)
throws IOException;
/**
* Called on store load to allow the user to decode the previously serialized
* state.
- * @param stream the stream that contains the user serialized data
+ * @param serializer contains the serialized state
*/
- protected abstract void deserializeStateData(final InputStream stream)
+ protected abstract void deserializeStateData(final ProcedureStateSerializer serializer)
throws IOException;
/**
@@ -184,7 +182,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Framework will call this method just before it invokes {@link #execute(Object)}.
* It calls {@link #releaseLock(Object)} after the call to execute.
*
- * <p>If you need to hold the lock for the life of the Procdure -- i.e. you do not
+ * <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not
* want any other Procedure interfering while this Procedure is running, see
* {@link #holdLock(Object)}.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index d0052f6..9337530 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -241,7 +239,7 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
- * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
+ * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
* The user of ProcedureExecutor should call getResult(procId) to get the result.
*/
@@ -750,14 +748,22 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
+ public static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
private String procName;
- public FailedProcedure(NonceKey nonceKey, String procName, User owner,
- IOException exception) {
+ public FailedProcedure() {
+ }
+
+ public FailedProcedure(long procId, String procName, User owner,
+ NonceKey nonceKey, IOException exception) {
this.procName = procName;
- setNonceKey(nonceKey);
+ setProcId(procId);
+ setState(ProcedureState.ROLLEDBACK);
setOwner(owner);
+ setNonceKey(nonceKey);
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ setSubmittedTime(currentTime);
+ setLastUpdate(currentTime);
setFailure(Objects.toString(exception.getMessage(), ""), exception);
}
@@ -785,11 +791,13 @@ public class ProcedureExecutor<TEnvironment> {
}
@Override
- protected void serializeStateData(OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
@Override
- protected void deserializeStateData(InputStream stream) throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
}
@@ -809,7 +817,9 @@ public class ProcedureExecutor<TEnvironment> {
final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null || completed.containsKey(procId)) return;
- Procedure proc = new FailedProcedure(nonceKey, procName, procOwner, exception);
+ Procedure<?> proc = new FailedProcedure(procId.longValue(),
+ procName, procOwner, nonceKey, exception);
+
completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc));
}
@@ -1045,15 +1055,17 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
- * List procedures.
+ * Get procedures.
* @return the procedures in a list
*/
- public List<Procedure> listProcedures() {
- final List<Procedure> procedureLists = new ArrayList<>(procedures.size() + completed.size());
- procedureLists.addAll(procedures.values());
+ public List<Procedure<?>> getProcedures() {
+ final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size());
+ for (Procedure<?> procedure : procedures.values()) {
+ procedureLists.add(procedure);
+ }
// Note: The procedure could show up twice in the list with different state, as
// it could complete after we walk through procedures list and insert into
- // procedureList - it is ok, as we will use the information in the ProcedureInfo
+ // procedureList - it is ok, as we will use the information in the Procedure
// to figure it out; to prevent this would increase the complexity of the logic.
for (CompletedProcedureRetainer retainer: completed.values()) {
procedureLists.add(retainer.getProcedure());
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
index b148dae..596ff21 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
-import java.io.InputStream;
-import java.io.OutputStream;
-
+import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -58,12 +56,12 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
}
@Override
- public void serializeStateData(final OutputStream stream) {
- throw new UnsupportedOperationException();
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
@Override
- public void deserializeStateData(final InputStream stream) {
- throw new UnsupportedOperationException();
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index a5a126d..1e4240a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -125,13 +125,13 @@ public interface ProcedureScheduler {
* List lock queues.
* @return the locks
*/
- // TODO: This seems to be the wrong place to hang this method.
- List<LockInfo> listLocks();
+ List<LockedResource> getLocks();
/**
- * @return {@link LockInfo} for resource of specified type & name. null if resource is not locked.
+ * @return {@link LockedResource} for resource of specified type & name. null if resource is not locked.
*/
- LockInfo getLockInfoForResource(LockInfo.ResourceType resourceType, String resourceName);
+ LockedResource getLockResource(LockedResourceType resourceType, String resourceName);
+
/**
* Returns the number of elements in this queue.
* @return the number of elements in this queue.
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java
new file mode 100644
index 0000000..03842d9
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureStateSerializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+
+public interface ProcedureStateSerializer {
+ void serialize(Message message) throws IOException;
+
+ <M extends Message> M deserialize(Class<M> clazz) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index 3232f2b..2381abd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -18,21 +18,22 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
-
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ProcedureState;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Any;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Parser;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.NonceKey;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
/**
* Helper to convert to/from ProcedureProtos
*/
@@ -85,6 +86,69 @@ public final class ProcedureUtil {
// ==========================================================================
/**
+ * A serializer for our Procedures. Instead of the previous serializer, it
+ * uses the stateMessage list to store the internal state of the Procedures.
+ */
+ private static class StateSerializer implements ProcedureStateSerializer {
+ private final ProcedureProtos.Procedure.Builder builder;
+ private int deserializeIndex;
+
+ public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void serialize(Message message) throws IOException {
+ Any packedMessage = Any.pack(message);
+ builder.addStateMessage(packedMessage);
+ }
+
+ @Override
+ public <M extends Message> M deserialize(Class<M> clazz)
+ throws IOException {
+ if (deserializeIndex >= builder.getStateMessageCount()) {
+ throw new IOException("Invalid state message index: " + deserializeIndex);
+ }
+
+ try {
+ Any packedMessage = builder.getStateMessage(deserializeIndex++);
+ return packedMessage.unpack(clazz);
+ } catch (InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
+ }
+ }
+ }
+
+ /**
+ * A serializer (deserializer) for those Procedures which were serialized
+ * before this patch. It deserializes the old, binary stateData field.
+ */
+ private static class CompatStateSerializer implements ProcedureStateSerializer {
+ private InputStream inputStream;
+
+ public CompatStateSerializer(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public void serialize(Message message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <M extends Message> M deserialize(Class<M> clazz)
+ throws IOException {
+ Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
+ try {
+ return parser.parseDelimitedFrom(inputStream);
+ } catch (InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
+ }
+ }
+ }
+
+ /**
* Helper to convert the procedure to protobuf.
* Used by ProcedureStore implementations.
*/
@@ -130,15 +194,8 @@ public final class ProcedureUtil {
builder.setResult(UnsafeByteOperations.unsafeWrap(result));
}
- final ByteString.Output stateStream = ByteString.newOutput();
- try {
- proc.serializeStateData(stateStream);
- if (stateStream.size() > 0) {
- builder.setStateData(stateStream.toByteString());
- }
- } finally {
- stateStream.close();
- }
+ ProcedureStateSerializer serializer = new StateSerializer(builder);
+ proc.serializeStateData(serializer);
if (proc.getNonceKey() != null) {
builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
@@ -198,87 +255,62 @@ public final class ProcedureUtil {
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
}
- // we want to call deserialize even when the stream is empty, mainly for testing.
- proc.deserializeStateData(proto.getStateData().newInput());
+ ProcedureStateSerializer serializer = null;
+
+ if (proto.getStateMessageCount() > 0) {
+ serializer = new StateSerializer(proto.toBuilder());
+ } else if (proto.hasStateData()) {
+ InputStream inputStream = proto.getStateData().newInput();
+ serializer = new CompatStateSerializer(inputStream);
+ }
+
+ if (serializer != null) {
+ proc.deserializeStateData(serializer);
+ }
return proc;
}
// ==========================================================================
- // convert to and from ProcedureInfo object
+ // convert from LockedResource object
// ==========================================================================
- /**
- * @return Convert the current {@link ProcedureInfo} into a Protocol Buffers Procedure
- * instance.
- */
- public static ProcedureProtos.Procedure convertToProtoProcedure(final ProcedureInfo procInfo) {
- final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder();
+ public static LockServiceProtos.LockedResourceType convertToProtoResourceType(
+ LockedResourceType resourceType) {
+ return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
+ }
- builder.setClassName(procInfo.getProcName());
- builder.setProcId(procInfo.getProcId());
- builder.setSubmittedTime(procInfo.getSubmittedTime());
- builder.setState(ProcedureProtos.ProcedureState.valueOf(procInfo.getProcState().name()));
- builder.setLastUpdate(procInfo.getLastUpdate());
+ public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
+ return LockServiceProtos.LockType.valueOf(lockType.name());
+ }
- if (procInfo.hasParentId()) {
- builder.setParentId(procInfo.getParentId());
- }
+ public static LockServiceProtos.LockedResource convertToProtoLockedResource(
+ LockedResource lockedResource) throws IOException
+ {
+ LockServiceProtos.LockedResource.Builder builder =
+ LockServiceProtos.LockedResource.newBuilder();
- if (procInfo.hasOwner()) {
- builder.setOwner(procInfo.getProcOwner());
- }
+ builder
+ .setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
+ .setResourceName(lockedResource.getResourceName())
+ .setLockType(convertToProtoLockType(lockedResource.getLockType()));
- if (procInfo.isFailed()) {
- builder.setException(ForeignExceptionUtil.toProtoForeignException(procInfo.getException()));
- }
+ Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
- if (procInfo.hasResultData()) {
- builder.setResult(UnsafeByteOperations.unsafeWrap(procInfo.getResult()));
+ if (exclusiveLockOwnerProcedure != null) {
+ ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
+ convertToProtoProcedure(exclusiveLockOwnerProcedure);
+ builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
}
- return builder.build();
- }
+ builder.setSharedLockCount(lockedResource.getSharedLockCount());
- /**
- * Helper to convert the protobuf object.
- * @return Convert the current Protocol Buffers Procedure to {@link ProcedureInfo}
- * instance.
- */
- public static ProcedureInfo convertToProcedureInfo(final ProcedureProtos.Procedure procProto) {
- NonceKey nonceKey = null;
- if (procProto.getNonce() != HConstants.NO_NONCE) {
- nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
+ for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
+ ProcedureProtos.Procedure waitingProcedureProto =
+ convertToProtoProcedure(waitingProcedure);
+ builder.addWaitingProcedures(waitingProcedureProto);
}
- return new ProcedureInfo(procProto.getProcId(), procProto.getClassName(),
- procProto.hasOwner() ? procProto.getOwner() : null,
- convertToProcedureState(procProto.getState()),
- procProto.hasParentId() ? procProto.getParentId() : -1, nonceKey,
- procProto.hasException() ?
- ForeignExceptionUtil.toIOException(procProto.getException()) : null,
- procProto.getLastUpdate(), procProto.getSubmittedTime(),
- procProto.hasResult() ? procProto.getResult().toByteArray() : null);
- }
-
- public static ProcedureState convertToProcedureState(ProcedureProtos.ProcedureState state) {
- return ProcedureState.valueOf(state.name());
- }
-
- public static ProcedureInfo convertToProcedureInfo(final Procedure proc) {
- return convertToProcedureInfo(proc, null);
- }
-
- /**
- * Helper to create the ProcedureInfo from Procedure.
- */
- public static ProcedureInfo convertToProcedureInfo(final Procedure proc,
- final NonceKey nonceKey) {
- final RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
- return new ProcedureInfo(proc.getProcId(), proc.toStringClass(), proc.getOwner(),
- convertToProcedureState(proc.getState()),
- proc.hasParent() ? proc.getParentProcId() : -1, nonceKey,
- exception != null ? exception.unwrapRemoteIOException() : null,
- proc.getLastUpdate(), proc.getSubmittedTime(), proc.getResult());
+ return builder.build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
index 64bb278..f03653f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData;
@@ -69,15 +66,17 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir
}
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
SequentialProcedureData.Builder data = SequentialProcedureData.newBuilder();
data.setExecuted(executed);
- data.build().writeDelimitedTo(stream);
+ serializer.serialize(data.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- SequentialProcedureData data = SequentialProcedureData.parseDelimitedFrom(stream);
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ SequentialProcedureData data = serializer.deserialize(SequentialProcedureData.class);
executed = data.getExecuted();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
index 69c59c8..3c2445c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
@@ -78,12 +78,13 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
- public List<LockInfo> listLocks() {
+ public List<LockedResource> getLocks() {
return Collections.emptyList();
}
@Override
- public LockInfo getLockInfoForResource(LockInfo.ResourceType resourceType, String resourceName) {
+ public LockedResource getLockResource(LockedResourceType resourceType,
+ String resourceName) {
return null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 5de5066..25dfe8b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -285,17 +283,19 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
}
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
for (int i = 0; i < stateCount; ++i) {
data.addState(states[i]);
}
- data.build().writeDelimitedTo(stream);
+ serializer.serialize(data.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
stateCount = data.getStateCount();
if (stateCount > 0) {
states = new int[stateCount];
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 5cdbc35..99d3c28 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -37,11 +35,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BytesValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
@@ -367,11 +366,13 @@ public class ProcedureTestingUtility {
protected boolean abort(TEnv env) { return false; }
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
}
@@ -416,19 +417,23 @@ public class ProcedureTestingUtility {
}
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
- StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0);
- if (data != null) stream.write(data);
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
+ BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
+ serializer.serialize(builder.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- int len = StreamUtils.readRawVarint32(stream);
- if (len > 0) {
- data = new byte[len];
- stream.read(data);
- } else {
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ BytesValue bytesValue = serializer.deserialize(BytesValue.class);
+ ByteString dataString = bytesValue.getValue();
+
+ if (dataString.isEmpty()) {
data = null;
+ } else {
+ data = dataString.toByteArray();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index b81e0f9..ce9795f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -28,10 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -42,8 +40,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureEvents {
@@ -163,15 +159,23 @@ public class TestProcedureEvents {
}
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
- StreamUtils.writeRawVInt32(stream, ntimeouts.get());
- StreamUtils.writeRawVInt32(stream, maxTimeouts);
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get());
+ serializer.serialize(ntimeoutsBuilder.build());
+
+ Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts);
+ serializer.serialize(maxTimeoutsBuilder.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- ntimeouts.set(StreamUtils.readRawVarint32(stream));
- maxTimeouts = StreamUtils.readRawVarint32(stream);
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class);
+ ntimeouts.set(ntimeoutsValue.getValue());
+
+ Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class);
+ maxTimeouts = maxTimeoutsValue.getValue();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 9681bfb..f1dadb9 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
@@ -31,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -382,17 +381,19 @@ public class TestProcedureRecovery {
}
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
- super.serializeStateData(stream);
- stream.write(Bytes.toBytes(iResult));
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ super.serializeStateData(serializer);
+ Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
+ serializer.serialize(builder.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- super.deserializeStateData(stream);
- byte[] data = new byte[4];
- stream.read(data);
- iResult = Bytes.toInt(data);
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ super.deserializeStateData(serializer);
+ Int32Value value = serializer.deserialize(Int32Value.class);
+ iResult = value.getValue();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index bd614e3..80264f5 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,9 +27,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int64Value;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -195,13 +193,17 @@ public class TestProcedureReplayOrder {
protected boolean abort(TestProcedureEnv env) { return true; }
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
- StreamUtils.writeLong(stream, execId);
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId);
+ serializer.serialize(builder.build());
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
- execId = StreamUtils.readLong(stream);
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ Int64Value value = serializer.deserialize(Int64Value.class);
+ execId = value.getValue();
step = 2;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index 0146bc7..f86df2d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -251,11 +249,13 @@ public class TestProcedureSuspended {
protected boolean abort(TestProcEnv env) { return false; }
@Override
- protected void serializeStateData(final OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
@Override
- protected void deserializeStateData(final InputStream stream) throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
index 78daf5a..af25108 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -57,11 +54,13 @@ public class TestProcedureToString {
}
@Override
- protected void serializeStateData(OutputStream stream) throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
@Override
- protected void deserializeStateData(InputStream stream) throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/359fed7b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
index 7f98b80..dec5854 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
@@ -18,11 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.util.JsonFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -34,8 +30,6 @@ import static org.junit.Assert.assertEquals;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureUtil {
- private static final Log LOG = LogFactory.getLog(TestProcedureUtil.class);
-
@Test
public void testValidation() throws Exception {
ProcedureUtil.validateClass(new TestProcedure(10));
@@ -49,34 +43,15 @@ public class TestProcedureUtil {
@Test
public void testConvert() throws Exception {
// check Procedure to protobuf conversion
- final TestProcedure proc1 = new TestProcedure(10);
+ final TestProcedure proc1 = new TestProcedure(10, 1, new byte[] { 65 });
final ProcedureProtos.Procedure proto1 = ProcedureUtil.convertToProtoProcedure(proc1);
final TestProcedure proc2 = (TestProcedure)ProcedureUtil.convertToProcedure(proto1);
final ProcedureProtos.Procedure proto2 = ProcedureUtil.convertToProtoProcedure(proc2);
assertEquals(false, proto2.hasResult());
assertEquals("Procedure protobuf does not match", proto1, proto2);
-
- // remove the state-data from the procedure protobuf to compare it to the gen ProcedureInfo
- final ProcedureProtos.Procedure pbproc = proto2.toBuilder().clearStateData().build();
-
- // check ProcedureInfo to protobuf conversion
- final ProcedureInfo protoInfo1 = ProcedureUtil.convertToProcedureInfo(proc1);
- final ProcedureProtos.Procedure proto3 = ProcedureUtil.convertToProtoProcedure(protoInfo1);
- final ProcedureInfo protoInfo2 = ProcedureUtil.convertToProcedureInfo(proto3);
- final ProcedureProtos.Procedure proto4 = ProcedureUtil.convertToProtoProcedure(protoInfo2);
- assertEquals("ProcedureInfo protobuf does not match", proto3, proto4);
- assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto3);
- assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto4);
}
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
public TestProcedureNoDefaultConstructor(int x) {}
}
-
- public static void main(final String [] args) throws Exception {
- final TestProcedure proc1 = new TestProcedure(10);
- final ProcedureProtos.Procedure proto1 = ProcedureUtil.convertToProtoProcedure(proc1);
- JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();
- System.out.println(printer.print(proto1));
- }
}