You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/02/26 22:54:34 UTC
svn commit: r1450465 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/
test/java/org/apache/hadoop/hbase/regionserver/
Author: tedyu
Date: Tue Feb 26 21:54:34 2013
New Revision: 1450465
URL: http://svn.apache.org/r1450465
Log:
HBASE-4210 Allow coprocessor to interact with batches per region sent from a client (Anoop)
Added:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Tue Feb 26 21:54:34 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Ap
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.filter.Wr
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -185,6 +187,16 @@ public abstract class BaseRegionObserver
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException {
}
+
+ @Override
+ public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ }
+
+ @Override
+ public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ }
@Override
public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e,
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Tue Feb 26 21:54:34 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Ap
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.filter.Wr
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -404,6 +406,29 @@ public interface RegionObserver extends
throws IOException;
/**
+ * This will be called for every batch mutation operation happening at the server. This will be
+ * called after acquiring the locks on the mutating rows and after applying the proper timestamp
+ * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus
+ * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
+ * {@link RegionObserver} can make HRegion to skip these Mutations.
+ * @param c the environment provided by the region server
+ * @param miniBatchOp batch of Mutations getting applied to region.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+
+ /**
+ * This will be called after applying a batch of Mutations on a region. The Mutations are added to
+ * memstore and WAL.
+ * @param c the environment provided by the region server
+ * @param miniBatchOp batch of Mutations applied to region.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+
+ /**
* Called before checkAndPut
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 21:54:34 2013
@@ -2263,6 +2263,14 @@ public class HRegion implements HeapSize
// ----------------------------------
w = mvcc.beginMemstoreInsert();
+ // calling the pre CP hook for batch mutation
+ if (coprocessorHost != null) {
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+ new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
+ if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
+ }
+
// ------------------------------------
// STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore
@@ -2336,6 +2344,14 @@ public class HRegion implements HeapSize
syncOrDefer(txid);
}
walSyncSuccessful = true;
+ // calling the post CP hook for batch mutation
+ if (coprocessorHost != null) {
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+ new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
+ coprocessorHost.postBatchMutate(miniBatchOp);
+ }
+
// ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java?rev=1450465&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java Tue Feb 26 21:54:34 2013
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Wraps together the mutations which are applied as a batch to the region and their operation
+ * status and WALEdits.
+ * @see RegionObserver#preBatchMutate(ObserverContext, MiniBatchOperationInProgress)
+ * @see RegionObserver#postBatchMutate(ObserverContext, MiniBatchOperationInProgress)
+ * @param <T> Pair<Mutation, Integer> pair of Mutations and associated rowlock ids.
+ */
+public class MiniBatchOperationInProgress<T> {
+ private final T[] operations;
+ private final OperationStatus[] retCodeDetails;
+ private final WALEdit[] walEditsFromCoprocessors;
+ private final int firstIndex;
+ private final int lastIndexExclusive;
+
+ public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
+ WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
+ this.operations = operations;
+ this.retCodeDetails = retCodeDetails;
+ this.walEditsFromCoprocessors = walEditsFromCoprocessors;
+ this.firstIndex = firstIndex;
+ this.lastIndexExclusive = lastIndexExclusive;
+ }
+
+ /**
+ * @return The number of operations(Mutations) involved in this batch.
+ */
+ public int size() {
+ return this.lastIndexExclusive - this.firstIndex;
+ }
+
+ /**
+ * @param index
+ * @return The operation(Mutation) at the specified position.
+ */
+ public T getOperation(int index) {
+ return operations[getAbsoluteIndex(index)];
+ }
+
+ /**
+ * Sets the status code for the operation(Mutation) at the specified position.
+ * By setting this status, {@link RegionObserver} can make HRegion to skip Mutations.
+ * @param index
+ * @param opStatus
+ */
+ public void setOperationStatus(int index, OperationStatus opStatus) {
+ this.retCodeDetails[getAbsoluteIndex(index)] = opStatus;
+ }
+
+ /**
+ * @param index
+ * @return Gets the status code for the operation(Mutation) at the specified position.
+ */
+ public OperationStatus getOperationStatus(int index) {
+ return this.retCodeDetails[getAbsoluteIndex(index)];
+ }
+
+ /**
+ * Sets the walEdit for the operation(Mutation) at the specified position.
+ * @param index
+ * @param walEdit
+ */
+ public void setWalEdit(int index, WALEdit walEdit) {
+ this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit;
+ }
+
+ /**
+ * @param index
+ * @return Gets the walEdit for the operation(Mutation) at the specified position.
+ */
+ public WALEdit getWalEdit(int index) {
+ return this.walEditsFromCoprocessors[getAbsoluteIndex(index)];
+ }
+
+ private int getAbsoluteIndex(int index) {
+ if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ return this.firstIndex + index;
+ }
+}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Feb 26 21:54:34 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Ap
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -885,6 +886,54 @@ public class RegionCoprocessorHost
}
}
}
+
+ /**
+ * @param miniBatchOp
+ * @return true if default processing should be bypassed
+ * @throws IOException
+ */
+ public boolean preBatchMutate(
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ boolean bypass = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ }
+
+ /**
+ * @param miniBatchOp
+ * @throws IOException
+ */
+ public void postBatchMutate(
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
/**
* @param row row to check
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Tue Feb 26 21:54:34 2013
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Leases;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -98,7 +100,9 @@ public class SimpleRegionObserver extend
boolean hadPostScannerOpen = false;
boolean hadPreBulkLoadHFile = false;
boolean hadPostBulkLoadHFile = false;
-
+ boolean hadPreBatchMutate = false;
+ boolean hadPostBatchMutate = false;
+
@Override
public void start(CoprocessorEnvironment e) throws IOException {
// this only makes sure that leases and locks are available to coprocessors
@@ -392,6 +396,26 @@ public class SimpleRegionObserver extend
}
@Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ RegionCoprocessorEnvironment e = c.getEnvironment();
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(miniBatchOp);
+ hadPreBatchMutate = true;
+ }
+
+ @Override
+ public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ RegionCoprocessorEnvironment e = c.getEnvironment();
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(miniBatchOp);
+ hadPostBatchMutate = true;
+ }
+
+ @Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte[] row, final byte[] family, final Result result)
throws IOException {
@@ -483,6 +507,15 @@ public class SimpleRegionObserver extend
public boolean hadPostPut() {
return hadPostPut;
}
+
+ public boolean hadPreBatchMutate() {
+ return hadPreBatchMutate;
+ }
+
+ public boolean hadPostBatchMutate() {
+ return hadPostBatchMutate;
+ }
+
public boolean hadDelete() {
return !beforeDelete;
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1450465&r1=1450464&r2=1450465&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Tue Feb 26 21:54:34 2013
@@ -102,9 +102,9 @@ public class TestRegionObserverInterface
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDelete"},
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
- new Boolean[] {false, false, true, true, false}
+ new Boolean[] {false, false, true, true, true, true, false}
);
Get get = new Get(ROW);
@@ -128,9 +128,9 @@ public class TestRegionObserverInterface
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDelete"},
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
- new Boolean[] {true, true, true, true, true}
+ new Boolean[] {true, true, true, true, true, true, true}
);
util.deleteTable(tableName);
table.close();
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java?rev=1450465&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java Tue Feb 26 21:54:34 2013
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMiniBatchOperationInProgress {
+
+ @Test
+ public void testMiniBatchOperationInProgressMethods() {
+ Pair<Mutation, Integer>[] operations = new Pair[10];
+ OperationStatus[] retCodeDetails = new OperationStatus[10];
+ WALEdit[] walEditsFromCoprocessors = new WALEdit[10];
+ for (int i = 0; i < 10; i++) {
+ operations[i] = new Pair<Mutation, Integer>(new Put(Bytes.toBytes(i)), null);
+ }
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch =
+ new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(operations, retCodeDetails,
+ walEditsFromCoprocessors, 0, 5);
+
+ assertEquals(5, miniBatch.size());
+ assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
+ assertTrue(Bytes.equals(Bytes.toBytes(2), miniBatch.getOperation(2).getFirst().getRow()));
+ assertTrue(Bytes.equals(Bytes.toBytes(4), miniBatch.getOperation(4).getFirst().getRow()));
+ try {
+ miniBatch.getOperation(5);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ miniBatch.setOperationStatus(1, OperationStatus.FAILURE);
+ assertEquals(OperationStatus.FAILURE, retCodeDetails[1]);
+ try {
+ miniBatch.setOperationStatus(6, OperationStatus.FAILURE);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ try {
+ miniBatch.setWalEdit(5, new WALEdit());
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+
+ miniBatch = new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(operations,
+ retCodeDetails, walEditsFromCoprocessors, 7, 10);
+ try {
+ miniBatch.setWalEdit(-1, new WALEdit());
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ try {
+ miniBatch.getOperation(-1);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ try {
+ miniBatch.getOperation(3);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ try {
+ miniBatch.getOperationStatus(9);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ try {
+ miniBatch.setOperationStatus(3, OperationStatus.FAILURE);
+ fail("Should throw Exception while accessing out of range");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ }
+ assertTrue(Bytes.equals(Bytes.toBytes(7), miniBatch.getOperation(0).getFirst().getRow()));
+ assertTrue(Bytes.equals(Bytes.toBytes(9), miniBatch.getOperation(2).getFirst().getRow()));
+ miniBatch.setOperationStatus(1, OperationStatus.SUCCESS);
+ assertEquals(OperationStatus.SUCCESS, retCodeDetails[8]);
+ WALEdit wal = new WALEdit();
+ miniBatch.setWalEdit(0, wal);
+ assertEquals(wal, walEditsFromCoprocessors[7]);
+ }
+}