You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/05/02 22:52:04 UTC
svn commit: r1333196 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hb...
Author: mbautin
Date: Wed May 2 20:52:03 2012
New Revision: 1333196
URL: http://svn.apache.org/viewvc?rev=1333196&view=rev
Log:
[89-fb] Allow atomic put/delete in one call: Port HBASE-3584 and HBASE-5203 to 0.89
Author: aaiyer
Summary:
Initial Patch. Needs clean up.
Out for some initial thoughts on a few issues.
Not ready for thorough review.
Test Plan:
mr test. -- Test Failures on ext-89:
TestRegionStateOnMasterFailure, TestRegionFavoredNodes
Reviewers: kannan, kranganathan
Reviewed By: kannan
CC: HBase Diffs Facebook Group, aaiyer, Kannan, madhuvaidya
Differential Revision: https://reviews.facebook.net/D1713
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java Wed May 2 20:52:03 2012
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.Map;
+
+public interface Attributes {
+ /**
+ * Sets an attribute.
+ * In case value = null attribute is removed from the attributes map.
+ * Attribute names starting with _ indicate system attributes.
+ * @param name attribute name
+ * @param value attribute value
+ */
+ public void setAttribute(String name, byte[] value);
+
+ /**
+ * Gets an attribute
+ * @param name attribute name
+ * @return attribute value if attribute is set, <tt>null</tt> otherwise
+ */
+ public byte[] getAttribute(String name);
+
+ /**
+ * Gets all attributes
+ * @return unmodifiable map of all attributes
+ */
+ public Map<String, byte[]> getAttributesMap();
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java Wed May 2 20:52:03 2012
@@ -66,16 +66,9 @@ import java.util.TreeMap;
* deleteFamily -- then you need to use the method overrides that take a
* timestamp. The constructor timestamp is not referenced.
*/
-public class Delete extends Operation
+public class Delete extends Mutation
implements Writable, Row, Comparable<Row> {
- private static final byte DELETE_VERSION = (byte)1;
-
- private byte [] row = null;
- // This ts is only used when doing a deleteRow. Anything less,
- private long ts;
- private long lockId = -1L;
- private final Map<byte [], List<KeyValue>> familyMap =
- new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+ private static final byte DELETE_VERSION = (byte)3;
/** Constructor for Writable. DO NOT USE */
public Delete() {
@@ -124,19 +117,10 @@ public class Delete extends Operation
this.ts = d.getTimeStamp();
this.lockId = d.getLockId();
this.familyMap.putAll(d.getFamilyMap());
+ this.writeToWAL = d.writeToWAL;
}
- public int compareTo(final Row d) {
- return Bytes.compareTo(this.getRow(), d.getRow());
- }
- /**
- * Method to check if the familyMap is empty
- * @return true if empty, false otherwise
- */
- public boolean isEmpty() {
- return familyMap.isEmpty();
- }
/**
* Delete all versions of all columns of the specified family.
@@ -243,102 +227,6 @@ public class Delete extends Operation
return this.familyMap;
}
- /**
- * Method for retrieving the delete's row
- * @return row
- */
- public byte [] getRow() {
- return this.row;
- }
-
- /**
- * Method for retrieving the delete's RowLock
- * @return RowLock
- */
- public RowLock getRowLock() {
- return new RowLock(this.row, this.lockId);
- }
-
- /**
- * Method for retrieving the delete's lock ID.
- *
- * @return The lock ID.
- */
- public long getLockId() {
- return this.lockId;
- }
-
- /**
- * Method for retrieving the delete's timestamp
- * @return timestamp
- */
- public long getTimeStamp() {
- return this.ts;
- }
-
- /**
- * Compile the column family (i.e. schema) information
- * into a Map. Useful for parsing and aggregation by debugging,
- * logging, and administration tools.
- * @return Map
- */
- @Override
- public Map<String, Object> getFingerprint() {
- Map<String, Object> map = new HashMap<String, Object>();
- List<String> families = new ArrayList<String>();
- // ideally, we would also include table information, but that information
- // is not stored in each Operation instance.
- map.put("families", families);
- for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
- families.add(Bytes.toStringBinary(entry.getKey()));
- }
- return map;
- }
-
- /**
- * Compile the details beyond the scope of getFingerprint (row, columns,
- * timestamps, etc.) into a Map along with the fingerprinted information.
- * Useful for debugging, logging, and administration tools.
- * @param maxCols a limit on the number of columns output prior to truncation
- * @return Map
- */
- @Override
- public Map<String, Object> toMap(int maxCols) {
- // we start with the fingerprint map and build on top of it.
- Map<String, Object> map = getFingerprint();
- // replace the fingerprint's simple list of families with a
- // map from column families to lists of qualifiers and kv details
- Map<String, List<Map<String, Object>>> columns =
- new HashMap<String, List<Map<String, Object>>>();
- map.put("families", columns);
- map.put("row", Bytes.toStringBinary(this.row));
- int colCount = 0;
- // iterate through all column families affected by this Delete
- for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
- // map from this family to details for each kv affected within the family
- List<Map<String, Object>> qualifierDetails =
- new ArrayList<Map<String, Object>>();
- columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
- colCount += entry.getValue().size();
- if (maxCols <= 0) {
- continue;
- }
- // add details for each kv
- for (KeyValue kv : entry.getValue()) {
- if (--maxCols <= 0 ) {
- continue;
- }
- Map<String, Object> kvMap = kv.toStringMap();
- // row and family information are already available in the bigger map
- kvMap.remove("row");
- kvMap.remove("family");
- qualifierDetails.add(kvMap);
- }
- }
- map.put("totalColumns", colCount);
- return map;
- }
-
//Writable
public void readFields(final DataInput in) throws IOException {
int version = in.readByte();
@@ -348,6 +236,9 @@ public class Delete extends Operation
this.row = Bytes.readByteArray(in);
this.ts = in.readLong();
this.lockId = in.readLong();
+ if (version > 2) {
+ this.writeToWAL = in.readBoolean();
+ }
this.familyMap.clear();
int numFamilies = in.readInt();
for(int i=0;i<numFamilies;i++) {
@@ -361,6 +252,9 @@ public class Delete extends Operation
}
this.familyMap.put(family, list);
}
+ if (version > 1) {
+ readAttributes(in);
+ }
}
public void write(final DataOutput out) throws IOException {
@@ -368,6 +262,7 @@ public class Delete extends Operation
Bytes.writeByteArray(out, this.row);
out.writeLong(this.ts);
out.writeLong(this.lockId);
+ out.writeBoolean(this.writeToWAL);
out.writeInt(familyMap.size());
for(Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
@@ -377,6 +272,7 @@ public class Delete extends Operation
kv.write(out);
}
}
+ writeAttributes(out);
}
/**
@@ -408,4 +304,17 @@ public class Delete extends Operation
}
+ /**
+ * Compile the column family (i.e. schema) information
+ * into a Map. Useful for parsing and aggregation by debugging,
+ * logging, and administration tools.
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> getFingerprint() {
+ Map<String, Object> map = super.getFingerprint();
+ map.put("operation", "Delete");
+ return map;
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Wed May 2 20:52:03 2012
@@ -66,7 +66,8 @@ import java.util.TreeSet;
* <p>
* To add a filter, execute {@link #setFilter(Filter) setFilter}.
*/
-public class Get extends Operation implements Writable, Row, Comparable<Row> {
+public class Get extends OperationWithAttributes
+ implements Writable, Row, Comparable<Row> {
private static final byte GET_VERSION = (byte)3;
private byte [] row = null;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed May 2 20:52:03 2012
@@ -195,7 +195,7 @@ public interface HConnection extends Clo
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
- public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
/**
@@ -236,6 +236,10 @@ public interface HConnection extends Clo
public void processBatchOfPuts(List<Put> list, final byte[] tableName)
throws IOException;
+ public int processBatchOfRowMutations(final List<RowMutations> list,
+ final byte[] tableName)
+ throws IOException;
+
/**
* Enable or disable region cache prefetch for the table. It will be
* applied for the given table's all HTable instances within this
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed May 2 20:52:03 2012
@@ -1525,6 +1525,31 @@ public class HConnectionManager {
return results;
}
+ public int processBatchOfRowMutations(final List<RowMutations> list,
+ final byte[] tableName)
+ throws IOException {
+ if (list.isEmpty()) return 0;
+ Batch<Object> b = new Batch<Object>(this) {
+ @SuppressWarnings("unchecked")
+ @Override
+ int doCall(final List<? extends Row> currentList, final byte [] row,
+ final byte[] tableName, Object whatevs)
+ throws IOException, RuntimeException {
+ final List<RowMutations> mutations = (List<RowMutations>)currentList;
+ getRegionServerWithRetries(new ServerCallable<Void>(this.c,
+ tableName, row) {
+ public Void call() throws IOException {
+ server.mutateRow(location.getRegionInfo().getRegionName(),
+ mutations);
+ return null;
+ }
+ });
+ return -1;
+ }
+ };
+ return b.process(list, tableName, new Object());
+ }
+
public int processBatchOfDeletes(final List<Delete> list,
final byte[] tableName)
throws IOException {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed May 2 20:52:03 2012
@@ -668,6 +668,28 @@ public class HTable implements HTableInt
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public void mutateRow(final RowMutations arm) throws IOException {
+ connection.getRegionServerWithRetries(
+ new ServerCallable<Void>(connection, tableName, arm.getRow()) {
+ public Void call() throws IOException {
+ server.mutateRow(location.getRegionInfo().getRegionName(), arm);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void mutateRow(final List<RowMutations> armList) throws IOException {
+ connection.processBatchOfRowMutations(armList, this.tableName);
+ }
+
+ /**
* Test for the existence of columns in the table, as specified in the Get.<p>
*
* This will return true if the Get matches one or more keys, false if not.<p>
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed May 2 20:52:03 2012
@@ -265,6 +265,18 @@ public interface HTableInterface {
long amount, boolean writeToWAL) throws IOException;
/**
+ * Performs multiple mutations atomically on a single row. Currently
+ * {@link Put} and {@link Delete} are supported.
+ *
+ * @param arm object that specifies the set of mutations to perform
+ * atomically
+ * @throws IOException
+ */
+ public void mutateRow(final RowMutations arm) throws IOException;
+
+ public void mutateRow(List<RowMutations> armList) throws IOException;
+
+ /**
* Tells whether or not 'auto-flush' is turned on.
*
* @return {@code true} if 'auto-flush' is enabled (default), meaning
@@ -310,4 +322,5 @@ public interface HTableInterface {
* @see #unlockRow
*/
void unlockRow(RowLock rl) throws IOException;
+
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed May 2 20:52:03 2012
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public abstract class Mutation extends OperationWithAttributes {
+ // Attribute used in Mutations to indicate the originating cluster.
+ private static final String CLUSTER_ID_ATTR = "_c.id_";
+
+ protected byte [] row = null;
+ protected long ts = HConstants.LATEST_TIMESTAMP;
+ protected long lockId = -1L;
+ protected boolean writeToWAL = true;
+ protected Map<byte [], List<KeyValue>> familyMap =
+ new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+
+ /**
+ * Compile the column family (i.e. schema) information
+ * into a Map. Useful for parsing and aggregation by debugging,
+ * logging, and administration tools.
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> getFingerprint() {
+ Map<String, Object> map = new HashMap<String, Object>();
+ List<String> families = new ArrayList<String>();
+ // ideally, we would also include table information, but that information
+ // is not stored in each Operation instance.
+ map.put("families", families);
+ for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
+ families.add(Bytes.toStringBinary(entry.getKey()));
+ }
+ return map;
+ }
+
+ /**
+ * Compile the details beyond the scope of getFingerprint (row, columns,
+ * timestamps, etc.) into a Map along with the fingerprinted information.
+ * Useful for debugging, logging, and administration tools.
+ * @param maxCols a limit on the number of columns output prior to truncation
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> toMap(int maxCols) {
+ // we start with the fingerprint map and build on top of it.
+ Map<String, Object> map = getFingerprint();
+ // replace the fingerprint's simple list of families with a
+ // map from column families to lists of qualifiers and kv details
+ Map<String, List<Map<String, Object>>> columns =
+ new HashMap<String, List<Map<String, Object>>>();
+ map.put("families", columns);
+ map.put("row", Bytes.toStringBinary(this.row));
+ int colCount = 0;
+ // iterate through all column families affected
+ for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
+ // map from this family to details for each kv affected within the family
+ List<Map<String, Object>> qualifierDetails =
+ new ArrayList<Map<String, Object>>();
+ columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
+ colCount += entry.getValue().size();
+ if (maxCols <= 0) {
+ continue;
+ }
+ // add details for each kv
+ for (KeyValue kv : entry.getValue()) {
+ if (--maxCols <= 0 ) {
+ continue;
+ }
+ Map<String, Object> kvMap = kv.toStringMap();
+ // row and family information are already available in the bigger map
+ kvMap.remove("row");
+ kvMap.remove("family");
+ qualifierDetails.add(kvMap);
+ }
+ }
+ map.put("totalColumns", colCount);
+ return map;
+ }
+
+ /**
+ * @return true if edits should be applied to WAL, false if not
+ */
+ public boolean getWriteToWAL() {
+ return this.writeToWAL;
+ }
+
+ /**
+ * Set whether this Delete should be written to the WAL or not.
+ * Not writing the WAL means you may lose edits on server crash.
+ * @param write true if edits should be written to WAL, false if not
+ */
+ public void setWriteToWAL(boolean write) {
+ this.writeToWAL = write;
+ }
+
+ /**
+ * Method for retrieving the put's familyMap
+ * @return familyMap
+ */
+ public Map<byte [], List<KeyValue>> getFamilyMap() {
+ return this.familyMap;
+ }
+
+ /**
+ * Method for setting the put's familyMap
+ */
+ public void setFamilyMap(Map<byte [], List<KeyValue>> map) {
+ this.familyMap = map;
+ }
+
+ /**
+ * Method to check if the familyMap is empty
+ * @return true if empty, false otherwise
+ */
+ public boolean isEmpty() {
+ return familyMap.isEmpty();
+ }
+
+ /**
+ * Method for retrieving the delete's row
+ * @return row
+ */
+ public byte [] getRow() {
+ return this.row;
+ }
+
+ public int compareTo(final Row d) {
+ return Bytes.compareTo(this.getRow(), d.getRow());
+ }
+
+ /**
+ * Method for retrieving the delete's RowLock
+ * @return RowLock
+ */
+ public RowLock getRowLock() {
+ return new RowLock(this.row, this.lockId);
+ }
+
+ /**
+ * Method for retrieving the delete's lock ID.
+ *
+ * @return The lock ID.
+ */
+ public long getLockId() {
+ return this.lockId;
+ }
+
+ /**
+ * Method for retrieving the timestamp
+ * @return timestamp
+ */
+ public long getTimeStamp() {
+ return this.ts;
+ }
+
+ /**
+ * @return the total number of KeyValues
+ */
+ public int size() {
+ int size = 0;
+ for(List<KeyValue> kvList : this.familyMap.values()) {
+ size += kvList.size();
+ }
+ return size;
+ }
+
+ /**
+ * @return the number of different families
+ */
+ public int numFamilies() {
+ return familyMap.size();
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java Wed May 2 20:52:03 2012
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
+
+public abstract class OperationWithAttributes extends Operation implements Attributes {
+ // a opaque blob of attributes
+ private Map<String, byte[]> attributes;
+
+ public void setAttribute(String name, byte[] value) {
+ if (attributes == null && value == null) {
+ return;
+ }
+
+ if (attributes == null) {
+ attributes = new HashMap<String, byte[]>();
+ }
+
+ if (value == null) {
+ attributes.remove(name);
+ if (attributes.isEmpty()) {
+ this.attributes = null;
+ }
+ } else {
+ attributes.put(name, value);
+ }
+ }
+
+ public byte[] getAttribute(String name) {
+ if (attributes == null) {
+ return null;
+ }
+
+ return attributes.get(name);
+ }
+
+ public Map<String, byte[]> getAttributesMap() {
+ if (attributes == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(attributes);
+ }
+
+ protected long getAttributeSize() {
+ long size = 0;
+ if (attributes != null) {
+ size += ClassSize.align(this.attributes.size() * ClassSize.MAP_ENTRY);
+ for(Map.Entry<String, byte[]> entry : this.attributes.entrySet()) {
+ size += ClassSize.align(ClassSize.STRING + entry.getKey().length());
+ size += ClassSize.align(ClassSize.ARRAY + entry.getValue().length);
+ }
+ }
+ return size;
+ }
+
+ protected void writeAttributes(final DataOutput out) throws IOException {
+ if (this.attributes == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(this.attributes.size());
+ for (Map.Entry<String, byte[]> attr : this.attributes.entrySet()) {
+ WritableUtils.writeString(out, attr.getKey());
+ Bytes.writeByteArray(out, attr.getValue());
+ }
+ }
+ }
+
+ protected void readAttributes(final DataInput in) throws IOException {
+ int numAttributes = in.readInt();
+ if (numAttributes > 0) {
+ this.attributes = new HashMap<String, byte[]>();
+ for(int i=0; i<numAttributes; i++) {
+ String name = WritableUtils.readString(in);
+ byte[] value = Bytes.readByteArray(in);
+ this.attributes.put(name, value);
+ }
+ }
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java Wed May 2 20:52:03 2012
@@ -45,20 +45,12 @@ import java.util.TreeMap;
* for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
* {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
*/
-public class Put extends Operation
+public class Put extends Mutation
implements HeapSize, Writable, Row, Comparable<Row> {
- private static final byte PUT_VERSION = (byte)1;
-
- private byte [] row = null;
- private long timestamp = HConstants.LATEST_TIMESTAMP;
- private long lockId = -1L;
- private boolean writeToWAL = true;
-
- private Map<byte [], List<KeyValue>> familyMap =
- new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+ private static final byte PUT_VERSION = (byte)2;
private static final long OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + ClassSize.REFERENCE +
+ ClassSize.OBJECT + 2 * ClassSize.REFERENCE +
2 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
ClassSize.REFERENCE + ClassSize.TREEMAP);
@@ -103,7 +95,7 @@ public class Put extends Operation
throw new IllegalArgumentException("Row key is invalid");
}
this.row = Arrays.copyOf(row, row.length);
- this.timestamp = ts;
+ this.ts = ts;
if(rowLock != null) {
this.lockId = rowLock.getLockId();
}
@@ -114,7 +106,7 @@ public class Put extends Operation
* @param putToCopy put to copy
*/
public Put(Put putToCopy) {
- this(putToCopy.getRow(), putToCopy.timestamp, putToCopy.getRowLock());
+ this(putToCopy.getRow(), putToCopy.ts, putToCopy.getRowLock());
this.familyMap =
new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
for(Map.Entry<byte [], List<KeyValue>> entry :
@@ -132,7 +124,7 @@ public class Put extends Operation
* @return this
*/
public Put add(byte [] family, byte [] qualifier, byte [] value) {
- return add(family, qualifier, this.timestamp, value);
+ return add(family, qualifier, this.ts, value);
}
/**
@@ -199,7 +191,7 @@ public class Put extends Operation
* existing KeyValue object in the family map.
*/
public boolean has(byte [] family, byte [] qualifier) {
- return has(family, qualifier, this.timestamp, new byte[0], true, true);
+ return has(family, qualifier, this.ts, new byte[0], true, true);
}
/**
@@ -229,7 +221,7 @@ public class Put extends Operation
* existing KeyValue object in the family map.
*/
public boolean has(byte [] family, byte [] qualifier, byte [] value) {
- return has(family, qualifier, this.timestamp, value, true, false);
+ return has(family, qualifier, this.ts, value, true, false);
}
/**
@@ -328,154 +320,6 @@ public class Put extends Operation
return list;
}
- /**
- * Method for retrieving the put's familyMap
- * @return familyMap
- */
- public Map<byte [], List<KeyValue>> getFamilyMap() {
- return this.familyMap;
- }
-
- /**
- * Method for retrieving the put's row
- * @return row
- */
- public byte [] getRow() {
- return this.row;
- }
-
- /**
- * Method for retrieving the put's RowLock
- * @return RowLock
- */
- public RowLock getRowLock() {
- return new RowLock(this.row, this.lockId);
- }
-
- /**
- * Method for retrieving the put's lockId
- * @return lockId
- */
- public long getLockId() {
- return this.lockId;
- }
-
- /**
- * Method to check if the familyMap is empty
- * @return true if empty, false otherwise
- */
- public boolean isEmpty() {
- return familyMap.isEmpty();
- }
-
- /**
- * @return Timestamp
- */
- public long getTimeStamp() {
- return this.timestamp;
- }
-
- /**
- * @return the number of different families included in this put
- */
- public int numFamilies() {
- return familyMap.size();
- }
-
- /**
- * @return the total number of KeyValues that will be added with this put
- */
- public int size() {
- int size = 0;
- for(List<KeyValue> kvList : this.familyMap.values()) {
- size += kvList.size();
- }
- return size;
- }
-
- /**
- * @return true if edits should be applied to WAL, false if not
- */
- public boolean getWriteToWAL() {
- return this.writeToWAL;
- }
-
- /**
- * Set whether this Put should be written to the WAL or not.
- * Not writing the WAL means you may lose edits on server crash.
- * @param write true if edits should be written to WAL, false if not
- */
- public void setWriteToWAL(boolean write) {
- this.writeToWAL = write;
- }
-
- /**
- * Compile the column family (i.e. schema) information
- * into a Map. Useful for parsing and aggregation by debugging,
- * logging, and administration tools.
- * @return Map
- */
- @Override
- public Map<String, Object> getFingerprint() {
- Map<String, Object> map = new HashMap<String, Object>();
- List<String> families = new ArrayList<String>();
- // ideally, we would also include table information, but that information
- // is not stored in each Operation instance.
- map.put("families", families);
- for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
- families.add(Bytes.toStringBinary(entry.getKey()));
- }
- return map;
- }
-
- /**
- * Compile the details beyond the scope of getFingerprint (row, columns,
- * timestamps, etc.) into a Map along with the fingerprinted information.
- * Useful for debugging, logging, and administration tools.
- * @param maxCols a limit on the number of columns output prior to truncation
- * @return Map
- */
- @Override
- public Map<String, Object> toMap(int maxCols) {
- // we start with the fingerprint map and build on top of it.
- Map<String, Object> map = getFingerprint();
- // replace the fingerprint's simple list of families with a
- // map from column families to lists of qualifiers and kv details
- Map<String, List<Map<String, Object>>> columns =
- new HashMap<String, List<Map<String, Object>>>();
- map.put("families", columns);
- map.put("row", Bytes.toStringBinary(this.row));
- int colCount = 0;
- // iterate through all column families affected by this Put
- for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
- // map from this family to details for each kv affected within the family
- List<Map<String, Object>> qualifierDetails =
- new ArrayList<Map<String, Object>>();
- columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
- colCount += entry.getValue().size();
- if (maxCols <= 0) {
- continue;
- }
- // add details for each kv
- for (KeyValue kv : entry.getValue()) {
- if (--maxCols <= 0 ) {
- continue;
- }
- Map<String, Object> kvMap = kv.toStringMap();
- // row and family information are already available in the bigger map
- kvMap.remove("row");
- kvMap.remove("family");
- qualifierDetails.add(kvMap);
- }
- }
- map.put("totalColumns", colCount);
- return map;
- }
-
- public int compareTo(Row p) {
- return Bytes.compareTo(this.getRow(), p.getRow());
- }
-
//HeapSize
public long heapSize() {
long heapsize = OVERHEAD;
@@ -502,6 +346,8 @@ public class Put extends Operation
heapsize += kv.heapSize();
}
}
+ heapsize += getAttributeSize();
+
return ClassSize.align((int)heapsize);
}
@@ -513,7 +359,7 @@ public class Put extends Operation
throw new IOException("version not supported");
}
this.row = Bytes.readByteArray(in);
- this.timestamp = in.readLong();
+ this.ts = in.readLong();
this.lockId = in.readLong();
this.writeToWAL = in.readBoolean();
int numFamilies = in.readInt();
@@ -533,13 +379,16 @@ public class Put extends Operation
}
this.familyMap.put(family, keys);
}
+ if (version > 1) {
+ readAttributes(in);
+ }
}
public void write(final DataOutput out)
throws IOException {
out.writeByte(PUT_VERSION);
Bytes.writeByteArray(out, this.row);
- out.writeLong(this.timestamp);
+ out.writeLong(this.ts);
out.writeLong(this.lockId);
out.writeBoolean(this.writeToWAL);
out.writeInt(familyMap.size());
@@ -557,10 +406,24 @@ public class Put extends Operation
out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
}
+ writeAttributes(out);
}
/**
- * Add the specified column and value, with the specified timestamp as
+ * Compile the column family (i.e. schema) information
+ * into a Map. Useful for parsing and aggregation by debugging,
+ * logging, and administration tools.
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> getFingerprint() {
+ Map<String, Object> map = super.getFingerprint();
+ map.put("operation", "Put");
+ return map;
+ }
+
+ /**
+ * Add the specified column and value, with the specified timestamp as
* its version to this Put operation.
* @param column Old style column name with family and qualifier put together
* with a colon.
@@ -572,5 +435,6 @@ public class Put extends Operation
public Put add(byte [] column, long ts, byte [] value) {
byte [][] parts = KeyValue.parseColumn(column);
return add(parts[0], parts[1], ts, value);
+
}
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java Wed May 2 20:52:03 2012
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Performs multiple mutations atomically on a single row.
+ * Currently {@link Put} and {@link Delete} are supported.
+ *
+ * The mutations are performed in the order in which they
+ * were added.
+ */
+public class RowMutations extends Operation implements Row {
+ private List<Mutation> mutations = new ArrayList<Mutation>();
+ protected byte [] row;
+ private static final byte VERSION = (byte)1;
+
+ /** Constructor for Writable. DO NOT USE */
+ public RowMutations() {}
+
+ /**
+ * Create an atomic mutation for the specified row.
+ * @param row row key
+ */
+ public RowMutations(byte [] row) {
+ if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
+ throw new IllegalArgumentException("Row key is invalid");
+ }
+ this.row = Arrays.copyOf(row, row.length);
+ }
+
+ /**
+ * Add a {@link Put} operation to the list of mutations
+ * @param p The {@link Put} to add
+ * @throws IOException
+ */
+ public void add(Put p) throws IOException {
+ internalAdd(p);
+ }
+
+ /**
+ * Add a {@link Delete} operation to the list of mutations
+ * @param d The {@link Delete} to add
+ * @throws IOException
+ */
+ public void add(Delete d) throws IOException {
+ internalAdd(d);
+ }
+
+ private void internalAdd(Mutation m) throws IOException {
+ int res = Bytes.compareTo(this.row, m.getRow());
+ if(res != 0) {
+ throw new IOException("The row in the recently added Put/Delete " +
+ Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
+ Bytes.toStringBinary(this.row));
+ }
+ mutations.add(m);
+ }
+
+ @Override
+ public void readFields(final DataInput in) throws IOException {
+ int version = in.readByte();
+ if (version > VERSION) {
+ throw new IOException("version not supported");
+ }
+ this.row = Bytes.readByteArray(in);
+ int numMutations = in.readInt();
+ mutations.clear();
+ for(int i=0; i<numMutations; i++) {
+ mutations.add((Mutation) HbaseObjectWritable.readObject(in, null));
+ }
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ out.writeByte(VERSION);
+ Bytes.writeByteArray(out, this.row);
+ out.writeInt(mutations.size());
+ for (Mutation m : mutations) {
+ HbaseObjectWritable.writeObject(out, m, m.getClass(), null);
+ }
+ }
+
+ @Override
+ public int compareTo(Row i) {
+ return Bytes.compareTo(this.getRow(), i.getRow());
+ }
+
+ @Override
+ public byte[] getRow() {
+ return row;
+ }
+
+ /**
+ * @return An unmodifiable list of the current mutations.
+ */
+ public List<Mutation> getMutations() {
+ return Collections.unmodifiableList(mutations);
+ }
+
+ /**
+ * Compile the column family (i.e. schema) information
+ * into a Map. Useful for parsing and aggregation by debugging,
+ * logging, and administration tools.
+ *
+ * e.g: {num-delete=1, num-put=1, row=testRow}
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> getFingerprint() {
+ Map<String, Object> map = new HashMap<String, Object>();
+ List<String> mutationsList = new ArrayList<String>();
+ // ideally, we would also include table information, but that information
+ // is not stored in each Operation instance.
+ map.put("row", Bytes.toStringBinary(this.row));
+ int deleteCnt = 0, putCnt = 0;
+ for (Mutation mod: this.mutations) {
+ if (mod instanceof Put) {
+ putCnt++;
+ }
+ else {
+ deleteCnt++;
+ }
+ }
+ map.put("num-put", putCnt);
+ map.put("num-delete", deleteCnt);
+ return map;
+ }
+
+ /**
+ * Compile the details beyond the scope of getFingerprint (row, columns,
+ * timestamps, etc.) into a Map along with the fingerprinted information.
+ * Useful for debugging, logging, and administration tools.
+ * @param maxCols a limit on the number of columns output prior to truncation
+ * e.g:
+ * {mutations=
+ * [Delete:{"operation":"Delete","totalColumns":1,"families":{...},"row":"testRow"},
+ * Put:{"operation":"Put","totalColumns":1,"families":{...},"row":"testRow"}],
+ * "num-delete"=1,
+ * "num-put"=1,
+ * "row"="testRow"}
+ * @return Map
+ */
+ @Override
+ public Map<String, Object> toMap(int maxCols) {
+ // we start with the fingerprint map and build on top of it.
+ Map<String, Object> map = getFingerprint();
+ // replace the fingerprint's simple list of families with a
+ // map from column families to lists of qualifiers and kv details
+ List<Map<String, Object>> mutationDetails =
+ new ArrayList<Map<String, Object>>();
+ map.put("row", Bytes.toStringBinary(this.row));
+ map.put("mutations", mutationDetails);
+ // iterate through all the mutations add the map for the mutation
+ int count = 0;
+ for (Mutation mod: this.mutations) {
+ mutationDetails.add(mod.toMap(maxCols));
+ if (++count > maxCols)
+ break;
+ }
+ return map;
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed May 2 20:52:03 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -160,6 +161,8 @@ public class HbaseObjectWritable impleme
addToMap(Result[].class, code++);
addToMap(Scan.class, code++);
+ addToMap(RowMutations.class, code++);
+
addToMap(WhileMatchFilter.class, code++);
addToMap(PrefixFilter.class, code++);
addToMap(PageFilter.class, code++);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed May 2 20:52:03 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Restartable;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -264,6 +265,12 @@ public interface HRegionInterface extend
public long openScanner(final byte [] regionName, final Scan scan)
throws IOException;
+ public void mutateRow(byte[] regionName, RowMutations arm)
+ throws IOException;
+
+ public void mutateRow(byte[] regionName, List<RowMutations> armList)
+ throws IOException;
+
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May 2 20:52:03 2012
@@ -72,11 +72,15 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -1712,6 +1716,60 @@ public class HRegion implements HeapSize
}
}
+ /**
+ * Setup a Delete object with correct timestamps.
+ * Caller should the row and region locks.
+ * @param delete
+ * @param now
+ * @throws IOException
+ */
+ private void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
+ throws IOException {
+ for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+
+ byte[] family = e.getKey();
+ List<KeyValue> kvs = e.getValue();
+ Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+ for (KeyValue kv: kvs) {
+ // Check if time is LATEST, change to time of most recent addition if so
+ // This is expensive.
+ if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+ byte[] qual = kv.getQualifier();
+ if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
+ Integer count = kvCount.get(qual);
+ if (count == null) {
+ kvCount.put(qual, 1);
+ } else {
+ kvCount.put(qual, count + 1);
+ }
+ count = kvCount.get(qual);
+
+ Get get = new Get(kv.getRow());
+ get.setMaxVersions(count);
+ get.addColumn(family, qual);
+
+ List<KeyValue> result = get(get);
+
+ if (result.size() < count) {
+ // Nothing to delete
+ kv.updateLatestStamp(byteNow);
+ continue;
+ }
+ if (result.size() > count) {
+ throw new RuntimeException("Unexpected size: " + result.size());
+ }
+ KeyValue getkv = result.get(count - 1);
+ Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+ getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+ } else {
+ kv.updateLatestStamp(byteNow);
+ }
+ }
+ }
+ }
+
/**
* @param familyMap map of family to edits for the given family.
@@ -1729,50 +1787,7 @@ public class HRegion implements HeapSize
updatesLock.readLock().lock();
try {
-
- for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-
- byte[] family = e.getKey();
- List<KeyValue> kvs = e.getValue();
- Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
- for (KeyValue kv: kvs) {
- // Check if time is LATEST, change to time of most recent addition if so
- // This is expensive.
- if (kv.isLatestTimestamp() && kv.isDeleteType()) {
- byte[] qual = kv.getQualifier();
- if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
-
- Integer count = kvCount.get(qual);
- if (count == null) {
- kvCount.put(qual, 1);
- } else {
- kvCount.put(qual, count + 1);
- }
- count = kvCount.get(qual);
-
- Get get = new Get(kv.getRow());
- get.setMaxVersions(count);
- get.addColumn(family, qual);
-
- List<KeyValue> result = get(get);
-
- if (result.size() < count) {
- // Nothing to delete
- kv.updateLatestStamp(byteNow);
- continue;
- }
- if (result.size() > count) {
- throw new RuntimeException("Unexpected size: " + result.size());
- }
- KeyValue getkv = result.get(count - 1);
- Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
- getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
- } else {
- kv.updateLatestStamp(byteNow);
- }
- }
- }
+ prepareDeleteTimestamps(familyMap, byteNow);
if (writeToWAL) {
// write/sync to WAL should happen before we touch memstore.
@@ -2311,11 +2326,16 @@ public class HRegion implements HeapSize
* new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
+ return applyFamilyMapToMemstore(familyMap, null);
+ }
+
+ private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
+ MultiVersionConsistencyControl.WriteEntry writeEntryToUse) {
long start = EnvironmentEdgeManager.currentTimeMillis();
MultiVersionConsistencyControl.WriteEntry w = null;
long size = 0;
try {
- w = mvcc.beginMemstoreInsert();
+ w = (writeEntryToUse == null)? mvcc.beginMemstoreInsert(): writeEntryToUse;
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -2328,12 +2348,15 @@ public class HRegion implements HeapSize
}
}
} finally {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- HRegion.memstoreInsertTime.addAndGet(now - start);
- start = now;
- mvcc.completeMemstoreInsert(w);
- now = EnvironmentEdgeManager.currentTimeMillis();
- HRegion.mvccWaitTime.addAndGet(now - start);
+ if (writeEntryToUse == null) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ HRegion.memstoreInsertTime.addAndGet(now - start);
+ start = now;
+ mvcc.completeMemstoreInsert(w);
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ HRegion.mvccWaitTime.addAndGet(now - start);
+ }
+ // else the calling function will take care of the mvcc completion and metrics.
}
return size;
@@ -3655,6 +3678,92 @@ public class HRegion implements HeapSize
return results;
}
+
+ public void mutateRow(RowMutations rm) throws IOException {
+ boolean flush = false;
+
+ Integer lid = null;
+
+ splitsAndClosesLock.readLock().lock();
+ try {
+ // 1. run all pre-hooks before the atomic operation
+ // NOT required for 0.89
+
+ // one WALEdit is used for all edits.
+ WALEdit walEdit = new WALEdit();
+
+ // 2. acquire the row lock
+ lid = getLock(null, rm.getRow(), true);
+
+ // 3. acquire the region lock
+ this.updatesLock.readLock().lock();
+
+ // 4. Get a mvcc write number
+ MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
+
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ byte[] byteNow = Bytes.toBytes(now);
+ try {
+ // 5. Check mutations and apply edits to a single WALEdit
+ for (Mutation m : rm.getMutations()) {
+ if (m instanceof Put) {
+ Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+ checkFamilies(familyMap.keySet());
+ checkTimestamps(familyMap, now);
+ updateKVTimestamps(familyMap.values(), byteNow);
+ } else if (m instanceof Delete) {
+ Delete d = (Delete) m;
+ prepareDelete(d);
+ Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+ prepareDeleteTimestamps(familyMap, byteNow);
+ } else {
+ throw new DoNotRetryIOException(
+ "Action must be Put or Delete. But was: "
+ + m.getClass().getName());
+ }
+ if (m.getWriteToWAL()) {
+ addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
+ }
+ }
+
+ // 6. append/sync all edits at once
+ // TODO: Do batching as in doMiniBatchPut
+ this.log.append(regionInfo, this.getTableDesc().getName(), walEdit, now);
+
+ // 7. apply to memstore
+ long addedSize = 0;
+ for (Mutation m : rm.getMutations()) {
+ addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
+ }
+ flush = isFlushSize(this.incMemoryUsage(addedSize));
+ } finally {
+ // 8. roll mvcc forward
+ long start = now;
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ HRegion.memstoreInsertTime.addAndGet(now - start);
+
+ mvcc.completeMemstoreInsert(w);
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ HRegion.mvccWaitTime.addAndGet(now - start);
+
+ // 9. release region lock
+ this.updatesLock.readLock().unlock();
+ }
+ // 10. run all coprocessor post hooks, after region lock is released
+ // NOT required in 0.89. coprocessors are not supported.
+
+ } finally {
+ if (lid != null) {
+ // 11. release the row lock
+ releaseRowLock(lid);
+ }
+ if (flush) {
+ // 12. Flush cache if needed. Do it outside update lock.
+ requestFlush();
+ }
+ splitsAndClosesLock.readLock().unlock();
+ }
+ }
/**
*
* @param row
@@ -3979,4 +4088,5 @@ public class HRegion implements HeapSize
if (bc != null) bc.shutdown();
}
}
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 2 20:52:03 2012
@@ -88,6 +88,10 @@ import org.apache.hadoop.hbase.Stoppable
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HMsg.Type;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -2039,6 +2043,35 @@ public class HRegionServer implements HR
}
@Override
+ public void mutateRow(byte[] regionName, List<RowMutations> armList)
+ throws IOException {
+ checkOpen();
+ if (regionName == null) {
+ throw new IOException("Invalid arguments to atomicMutation " +
+ "regionName is null");
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ if (!region.getRegionInfo().isMetaTable()) {
+ this.cacheFlusher.reclaimMemStoreMemory();
+ }
+ for (RowMutations arm: armList) {
+ this.requestCount.incrementAndGet();
+ region.mutateRow(arm);
+ }
+ } catch (Throwable t) {
+ throw convertThrowableToIOE(cleanup(t));
+ }
+ }
+
+ @Override
+ public void mutateRow(byte[] regionName, RowMutations arm)
+ throws IOException {
+ mutateRow(regionName, Collections.singletonList(arm));
+ }
+
+ @Override
public boolean exists(byte [] regionName, Get get) throws IOException {
checkOpen();
requestCount.incrementAndGet();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Wed May 2 20:52:03 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -71,7 +72,7 @@ public class RemoteHTable implements HTa
final long sleepTime;
@SuppressWarnings("unchecked")
- protected String buildRowSpec(final byte[] row, final Map familyMap,
+ protected String buildRowSpec(final byte[] row, final Map familyMap,
final long startTime, final long endTime, final int maxVersions) {
StringBuffer sb = new StringBuffer();
sb.append('/');
@@ -144,7 +145,7 @@ public class RemoteHTable implements HTa
byte[][] split = KeyValue.parseColumn(cell.getColumn());
byte[] column = split[0];
byte[] qualifier = split.length > 1 ? split[1] : null;
- kvs.add(new KeyValue(row.getKey(), column, qualifier,
+ kvs.add(new KeyValue(row.getKey(), column, qualifier,
cell.getTimestamp(), cell.getValue()));
}
results.add(new Result(kvs));
@@ -238,7 +239,7 @@ public class RemoteHTable implements HTa
TableSchemaModel schema = new TableSchemaModel();
schema.getObjectFromMessage(response.getBody());
return schema.getTableDescriptor();
- case 509:
+ case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
@@ -306,7 +307,7 @@ public class RemoteHTable implements HTa
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
- sb.append('/');
+ sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append('/');
@@ -364,7 +365,7 @@ public class RemoteHTable implements HTa
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
- sb.append('/');
+ sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append("/$multiput"); // can be any nonexistent row
@@ -495,7 +496,7 @@ public class RemoteHTable implements HTa
}
return results[0];
}
-
+
class Iter implements Iterator<Result> {
Result cache;
@@ -529,7 +530,7 @@ public class RemoteHTable implements HTa
public void remove() {
throw new RuntimeException("remove() not supported");
}
-
+
}
@Override
@@ -602,4 +603,15 @@ public class RemoteHTable implements HTa
throw new IOException("incrementColumnValue not supported");
}
+
+ @Override
+ public void mutateRow(RowMutations arm) throws IOException {
+ throw new IOException("atomicMutation not supported");
+ }
+
+ @Override
+ public void mutateRow(List<RowMutations> armList)
+ throws IOException {
+ throw new IOException("atomicMutation not supported");
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java Wed May 2 20:52:03 2012
@@ -29,11 +29,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Ignore;
@@ -77,6 +79,137 @@ public class TestAcidGuarantees {
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
util = new HBaseTestingUtility(conf);
}
+ /**
+ * Thread that does random full-row writes into a table.
+ */
+ public static class RowMutationsWriter extends RepeatingTestThread {
+ Random rand = new Random();
+ byte data[] = new byte[10];
+ byte targetRows[][];
+ byte targetFamilies[][];
+ HTable table;
+ AtomicLong numWritten = new AtomicLong();
+
+ public RowMutationsWriter(TestContext ctx, byte targetRows[][],
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.targetRows = targetRows;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(ctx.getConf(), TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ // Pick a random row to write into
+ byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+ RowMutations mutation = new RowMutations(targetRow);
+
+ long writeNumber = numWritten.incrementAndGet();
+ /*
+ * Do a mutation in which we write a random value in to col-0 and
+ * 100-that value in col 1; and delete the rest of the columns
+ * Should maintain the invariant that the sum of the two columns is
+ * always 100.
+ */
+ for (byte[] family : targetFamilies) {
+ long value = rand.nextInt();
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ if ((i + writeNumber) % NUM_COLS_TO_CHECK == 0) {
+ Put p = new Put(targetRow);
+ p.add(family, qualifier, writeNumber, Bytes.toBytes(value));
+ mutation.add(p);
+ }
+ else if ((i + writeNumber) % NUM_COLS_TO_CHECK == 1) {
+ Put p = new Put(targetRow);
+ p.add(family, qualifier, writeNumber, Bytes.toBytes(100 - value));
+ mutation.add(p);
+ }
+ else {
+ Delete d = new Delete(targetRow);
+ d.deleteColumns(family, qualifier, writeNumber);
+ mutation.add(d);
+ }
+ }
+ }
+ table.mutateRow(mutation);
+ }
+ }
+
+ /**
+ * Thread that does single-row reads in a table, looking for partially
+ * completed rows.
+ */
+ public static class RowMutationsGetReader extends RepeatingTestThread {
+ byte targetRow[];
+ byte targetFamilies[][];
+ HTable table;
+ int numVerified = 0;
+ AtomicLong numRead = new AtomicLong();
+ // initialized to true if we ever get a non-null result
+ boolean gotResults = false;
+
+ public RowMutationsGetReader(TestContext ctx, byte targetRow[],
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.targetRow = targetRow;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(ctx.getConf(), TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Get g = new Get(targetRow);
+ Result res = table.get(g);
+ if (res.getRow() == null) {
+ // Trying to verify but we didn't find the row - the writing
+ // thread probably just hasn't started writing yet, so we can
+ // ignore this action -- only if we have never received values
+ // before this.
+
+ if (gotResults)
+ gotFailure(res);
+
+ return;
+ }
+
+ gotResults = true;
+
+ for (byte[] family : targetFamilies) {
+ long sum = 0;
+ int countNonNull = 0;
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (thisValue != null) {
+ countNonNull++;
+ sum += Bytes.toLong(thisValue);
+ }
+ numVerified++;
+ }
+ if (sum != 100 || countNonNull != 2)
+ gotFailure(res);
+ }
+ numRead.getAndIncrement();
+ }
+
+ private void gotFailure(Result res) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed after ").append(numVerified).append("!");
+ msg.append("Expecting 2 KV each for " + targetFamilies.length
+ + " families. Summing up to 100.");
+ msg.append("Got:\n");
+ if (res.list() == null)
+ msg.append("None");
+ else {
+ for (KeyValue kv : res.list()) {
+ msg.append(kv.toString());
+ msg.append(" val= (long)");
+ msg.append(Bytes.toLong(kv.getValue()));
+ msg.append("\n");
+ }
+ throw new RuntimeException(msg.toString());
+ }
+ }
+ }
/**
* Thread that does random full-row writes into a table.
@@ -291,6 +424,63 @@ public class TestAcidGuarantees {
}
}
+
+ @Test
+ public void testMutations() throws Exception {
+ util.startMiniCluster(1);
+ try {
+ long millisToRun = 5000;
+ int numWriters = 5;
+ int numGetters = 5;
+ int numUniqueRows = 1;
+
+ createTableIfMissing();
+ TestContext ctx = new TestContext(util.getConfiguration());
+
+ byte rows[][] = new byte[numUniqueRows][];
+ for (int i = 0; i < numUniqueRows; i++) {
+ rows[i] = Bytes.toBytes("test_row_" + i);
+ }
+
+ List<RowMutationsWriter> writers = Lists.newArrayList();
+ for (int i = 0; i < numWriters; i++) {
+ RowMutationsWriter writer = new RowMutationsWriter(
+ ctx, rows, FAMILIES);
+ writers.add(writer);
+ ctx.addThread(writer);
+ }
+
+ // Add a flusher
+ ctx.addThread(new RepeatingTestThread(ctx) {
+ public void doAnAction() throws Exception {
+ util.flush();
+ }
+ });
+
+ List<RowMutationsGetReader> getters = Lists.newArrayList();
+ for (int i = 0; i < numGetters; i++) {
+ RowMutationsGetReader getter = new RowMutationsGetReader(
+ ctx, rows[i % numUniqueRows], FAMILIES);
+ getters.add(getter);
+ ctx.addThread(getter);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Finished test. Writers:");
+ for (RowMutationsWriter writer : writers) {
+ LOG.info(" wrote " + writer.numWritten.get());
+ }
+ LOG.info("Readers:");
+ for (RowMutationsGetReader reader : getters) {
+ LOG.info(" read " + reader.numRead.get());
+ }
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
@Test
public void testGetAtomicity() throws Exception {
util.startMiniCluster(1);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed May 2 20:52:03 2012
@@ -33,6 +33,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -4290,5 +4291,33 @@ public class TestFromClientSide {
assertTrue(addrAfter.getPort() != addrCache.getPort());
assertEquals(addrAfter.getPort(), addrNoCache.getPort());
}
+
+ @Test
+ public void testAtomicRowMutation() throws Exception {
+ LOG.info("Starting testAtomicRowMutation");
+ final byte [] TABLENAME = Bytes.toBytes("testAtomicRowMutation");
+ HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+ byte [][] QUALIFIERS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b")
+ };
+ RowMutations arm = new RowMutations(ROW);
+// arm.add(new Delete(ROW));
+ Put p = new Put(ROW);
+ p.add(FAMILY, QUALIFIERS[0], VALUE);
+ arm.add(p);
+ t.mutateRow(arm);
+
+ Get g = new Get(ROW);
+ Result r = t.get(g);
+ // delete was first, row should exist
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
+
+ arm = new RowMutations(ROW);
+ arm.add(p);
+ arm.add(new Delete(ROW));
+ t.mutateRow(Arrays.asList((RowMutations)arm));
+ r = t.get(g);
+ assertTrue(r.isEmpty());
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java Wed May 2 20:52:03 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -363,6 +364,25 @@ public class TestOperation {
kvMap = (Map) familyInfo.get(0);
assertEquals("Qualifier incorrect in Delete.toJSON()",
Bytes.toStringBinary(QUALIFIER), kvMap.get("qualifier"));
+
+ // produce a RowMutations operation
+ RowMutations rmut = new RowMutations(ROW);
+ rmut.add(delete);
+ rmut.add(put);
+ // get its JSON representation, and parse it
+ json = rmut.toJSON();
+ parsedJSON = mapper.readValue(json, HashMap.class);
+ // check for the row
+ assertEquals("row absent in RowMutations.toJSON()",
+ Bytes.toStringBinary(ROW), parsedJSON.get("row"));
+ // check for the family and the qualifier.
+ List<Map<String, Object>> mutationDetails = ((ArrayList)parsedJSON.get("mutations"));
+ assertEquals("Expecting to find one delete and one put",
+ 2, mutationDetails.size());
+ assertEquals("Should find one Put",
+ 1, parsedJSON.get("num-put"));
+ assertEquals("Should find one Delete",
+ 1, parsedJSON.get("num-delete"));
}
/**