You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/11 20:24:48 UTC
svn commit: r1230200 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/client/coprocessor/
main/java/org/apache/hadoop/hbase/coprocessor/
test/java/org/apache/hadoop/hbase/coprocessor/
Author: tedyu
Date: Wed Jan 11 19:24:48 2012
New Revision: 1230200
URL: http://svn.apache.org/viewvc?rev=1230200&view=rev
Log:
HBASE-5139 Compute (weighted) median using AggregateProtocol
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Wed Jan 11 19:24:48 2012
@@ -23,13 +23,20 @@ package org.apache.hadoop.hbase.client.c
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
@@ -362,4 +369,133 @@ public class AggregationClient {
return res;
}
+ /**
+ * It helps locate the region with median for a given column whose weight
+ * is specified in an optional column.
+ * From individual regions, it obtains sum of values and sum of weights.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return pair whose first element is a map between start row of the region
+ * and (sum of values, sum of weights) for the region, the second element is
+ * (sum of values, sum of weights) for all the regions chosen
+ * @throws Throwable
+ */
+ private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
+ getMedianArgs(final byte[] tableName,
+ final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+ validateParameters(scan);
+ final NavigableMap<byte[], List<S>> map =
+ new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
+ class StdCallback implements Batch.Callback<List<S>> {
+ S sumVal = null, sumWeights = null;
+
+ public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
+ List<S> l = new ArrayList<S>();
+ l.add(sumVal);
+ l.add(sumWeights);
+ Pair<NavigableMap<byte[], List<S>>, List<S>> p =
+ new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
+ return p;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, List<S> result) {
+ map.put(row, result);
+ sumVal = ci.add(sumVal, result.get(0));
+ sumWeights = ci.add(sumWeights, result.get(1));
+ }
+ }
+ StdCallback stdCallback = new StdCallback();
+ HTable table = new HTable(conf, tableName);
+ table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+ .getStopRow(),
+ new Batch.Call<AggregateProtocol, List<S>>() {
+ @Override
+ public List<S> call(AggregateProtocol instance)
+ throws IOException {
+ return instance.getMedian(ci, scan);
+ }
+
+ }, stdCallback);
+ return stdCallback.getMedianParams();
+ }
+
+ /**
+ * This is the client side interface/handler for calling the median method for a
+ * given cf-cq combination. This method collects the necessary parameters
+ * to compute the median and returns the median.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return R the median
+ * @throws Throwable
+ */
+ public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
+ Scan scan) throws Throwable {
+ Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
+ byte[] startRow = null;
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+ NavigableMap<byte[], List<S>> map = p.getFirst();
+ S sumVal = p.getSecond().get(0);
+ S sumWeights = p.getSecond().get(1);
+ double halfSumVal = ci.divideForAvg(sumVal, 2L);
+ double movingSumVal = 0;
+ boolean weighted = false;
+ if (quals.size() > 1) {
+ weighted = true;
+ halfSumVal = ci.divideForAvg(sumWeights, 2L);
+ }
+
+ for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
+ S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
+ double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+ if (newSumVal > halfSumVal) break; // we found the region with the median
+ movingSumVal = newSumVal;
+ startRow = entry.getKey();
+ }
+ // scan the region with median and find it
+ Scan scan2 = new Scan(scan);
+ // inherit stop row from method parameter
+ scan2.setStartRow(startRow);
+ HTable table = new HTable(conf, tableName);
+ int cacheSize = scan2.getCaching();
+ if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
+ scan2.setCacheBlocks(true);
+ cacheSize = 5;
+ scan2.setCaching(cacheSize);
+ }
+ ResultScanner scanner = table.getScanner(scan2);
+ Result[] results = null;
+ byte[] qualifier = quals.pollFirst();
+ // qualifier for the weight column
+ byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
+ R value = null;
+ try {
+ do {
+ results = scanner.next(cacheSize);
+ if (results != null && results.length > 0) {
+ for (int i = 0; i < results.length; i++) {
+ Result r = results[i];
+ // retrieve weight
+ KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
+ R newValue = ci.getValue(colFamily, weightQualifier, kv);
+ S s = ci.castToReturnType(newValue);
+ double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+ // see if we have moved past the median
+ if (newSumVal > halfSumVal) {
+ return value;
+ }
+ movingSumVal = newSumVal;
+ kv = r.getColumnLatest(colFamily, qualifier);
+ value = ci.getValue(colFamily, qualifier, kv);
+ }
+ }
+ } while (results != null && results.length > 0);
+ } finally {
+ scanner.close();
+ }
+ return null;
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java Wed Jan 11 19:24:48 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coproces
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -232,4 +233,45 @@ public class AggregateImplementation ext
return p;
}
+ @Override
+ public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+ throws IOException {
+ S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
+
+ InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+ .getRegion().getScanner(scan);
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+ byte[] valQualifier = quals.pollFirst();
+ // if weighted median is requested, get qualifier for the weight column
+ byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
+ List<KeyValue> results = new ArrayList<KeyValue>();
+
+ boolean hasMoreRows = false;
+ try {
+ do {
+ tempVal = null;
+ tempWeight = null;
+ hasMoreRows = scanner.next(results);
+ for (KeyValue kv : results) {
+ tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+ valQualifier, kv)));
+ if (weightQualifier != null) {
+ tempWeight = ci.add(tempWeight,
+ ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
+ }
+ }
+ results.clear();
+ sumVal = ci.add(sumVal, tempVal);
+ sumWeights = ci.add(sumWeights, tempWeight);
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ List<S> l = new ArrayList<S>();
+ l.add(sumVal);
+ l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
+ return l;
+ }
+
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java Wed Jan 11 19:24:48 2012
@@ -126,4 +126,19 @@ public interface AggregateProtocol exten
<T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
-}
\ No newline at end of file
+ /**
+ * Gives a List containing sum of values and sum of weights.
+ * It is computed for the combination of column
+ * family and column qualifier(s) in the given row range as defined in the
+ * Scan object. In its current implementation, it takes one column family and
+ * two column qualifiers. The first qualifier is for values column and
+ * the second qualifier (optional) is for weight column.
+ * @param ci
+ * @param scan
+ * @return Pair
+ * @throws IOException
+ */
+ <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+ throws IOException;
+
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Wed Jan 11 19:24:48 2012
@@ -124,6 +124,23 @@ public class TestAggregateProtocol {
}
/**
+ * ****************** Test cases for Median **********************
+ */
+ /**
+ * @throws Throwable
+ */
+ @Test
+ public void testMedianWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient(conf);
+ Scan scan = new Scan();
+ scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+ final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ long median = aClient.median(TEST_TABLE, ci,
+ scan);
+ assertEquals(8L, median);
+ }
+
+ /**
* **************************** ROW COUNT Test cases *******************
*/