You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/11 20:24:48 UTC

svn commit: r1230200 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/client/coprocessor/ main/java/org/apache/hadoop/hbase/coprocessor/ test/java/org/apache/hadoop/hbase/coprocessor/

Author: tedyu
Date: Wed Jan 11 19:24:48 2012
New Revision: 1230200

URL: http://svn.apache.org/viewvc?rev=1230200&view=rev
Log:
HBASE-5139 Compute (weighted) median using AggregateProtocol

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Wed Jan 11 19:24:48 2012
@@ -23,13 +23,20 @@ package org.apache.hadoop.hbase.client.c
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
@@ -362,4 +369,133 @@ public class AggregationClient {
     return res;
   }
 
+  /**
+   * It helps locate the region with median for a given column whose weight 
+   * is specified in an optional column.
+   * From individual regions, it obtains sum of values and sum of weights.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return pair whose first element is a map between start row of the region
+   *  and (sum of values, sum of weights) for the region, the second element is
+   *  (sum of values, sum of weights) for all the regions chosen
+   * @throws Throwable
+   */
+  private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
+  getMedianArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+    validateParameters(scan);
+    final NavigableMap<byte[], List<S>> map =
+      new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
+    class StdCallback implements Batch.Callback<List<S>> {
+      S sumVal = null, sumWeights = null;
+
+      public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
+        List<S> l = new ArrayList<S>();
+        l.add(sumVal);
+        l.add(sumWeights);
+        Pair<NavigableMap<byte[], List<S>>, List<S>> p =
+          new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
+        return p;
+      }
+
+      @Override
+      public synchronized void update(byte[] region, byte[] row, List<S> result) {
+        map.put(row, result);
+        sumVal = ci.add(sumVal, result.get(0));
+        sumWeights = ci.add(sumWeights, result.get(1));
+      }
+    }
+    StdCallback stdCallback = new StdCallback();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(),
+        new Batch.Call<AggregateProtocol, List<S>>() {
+          @Override
+          public List<S> call(AggregateProtocol instance)
+              throws IOException {
+            return instance.getMedian(ci, scan);
+          }
+
+        }, stdCallback);
+    return stdCallback.getMedianParams();
+  }
+
+  /**
+   * This is the client side interface/handler for calling the median method for a
+   * given cf-cq combination. This method collects the necessary parameters
+   * to compute the median and returns the median.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return R the median
+   * @throws Throwable
+   */
+  public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
+      Scan scan) throws Throwable {
+    Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
+    byte[] startRow = null;
+    byte[] colFamily = scan.getFamilies()[0];
+    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+    NavigableMap<byte[], List<S>> map = p.getFirst();
+    S sumVal = p.getSecond().get(0);
+    S sumWeights = p.getSecond().get(1);
+    double halfSumVal = ci.divideForAvg(sumVal, 2L);
+    double movingSumVal = 0;
+    boolean weighted = false;
+    if (quals.size() > 1) {
+      weighted = true;
+      halfSumVal = ci.divideForAvg(sumWeights, 2L);
+    }
+    
+    for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
+      S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
+      double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+      if (newSumVal > halfSumVal) break;  // we found the region with the median
+      movingSumVal = newSumVal;
+      startRow = entry.getKey();
+    }
+    // scan the region with median and find it
+    Scan scan2 = new Scan(scan);
+    // inherit stop row from method parameter
+    scan2.setStartRow(startRow);
+    HTable table = new HTable(conf, tableName);
+    int cacheSize = scan2.getCaching();
+    if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
+      scan2.setCacheBlocks(true);
+      cacheSize = 5;
+      scan2.setCaching(cacheSize);
+    }
+    ResultScanner scanner = table.getScanner(scan2);
+    Result[] results = null;
+    byte[] qualifier = quals.pollFirst();
+    // qualifier for the weight column
+    byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
+    R value = null;
+    try {
+      do {
+        results = scanner.next(cacheSize);
+        if (results != null && results.length > 0) {
+          for (int i = 0; i < results.length; i++) {
+            Result r = results[i];
+            // retrieve weight
+            KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
+            R newValue = ci.getValue(colFamily, weightQualifier, kv);
+            S s = ci.castToReturnType(newValue);
+            double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+            // see if we have moved past the median
+            if (newSumVal > halfSumVal) {
+              return value;
+            }
+            movingSumVal = newSumVal;
+            kv = r.getColumnLatest(colFamily, qualifier);
+            value = ci.getValue(colFamily, qualifier, kv);
+          }
+        }
+      } while (results != null && results.length > 0);
+    } finally {
+      scanner.close();
+    }
+    return null;
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java Wed Jan 11 19:24:48 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coproces
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -232,4 +233,45 @@ public class AggregateImplementation ext
     return p;
   }
 
+  @Override
+  public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+  throws IOException {
+    S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
+
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+    .getRegion().getScanner(scan);
+    byte[] colFamily = scan.getFamilies()[0];
+    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+    byte[] valQualifier = quals.pollFirst();
+    // if weighted median is requested, get qualifier for the weight column
+    byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    boolean hasMoreRows = false;
+    try {
+      do {
+        tempVal = null;
+        tempWeight = null;
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+              valQualifier, kv)));
+          if (weightQualifier != null) {
+            tempWeight = ci.add(tempWeight,
+                ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
+          }
+        }
+        results.clear();
+        sumVal = ci.add(sumVal, tempVal);
+        sumWeights = ci.add(sumWeights, tempWeight);
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    List<S> l = new ArrayList<S>();
+    l.add(sumVal);
+    l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
+    return l;
+  }
+  
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java Wed Jan 11 19:24:48 2012
@@ -126,4 +126,19 @@ public interface AggregateProtocol exten
   <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
       throws IOException;
 
-}
\ No newline at end of file
+  /**
+   * Gives a List containing sum of values and sum of weights.
+   * It is computed for the combination of column
+   * family and column qualifier(s) in the given row range as defined in the
+   * Scan object. In its current implementation, it takes one column family and
+   * two column qualifiers. The first qualifier is for values column and 
+   * the second qualifier (optional) is for weight column.
+   * @param ci
+   * @param scan
+   * @return Pair
+   * @throws IOException
+   */
+  <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException;
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1230200&r1=1230199&r2=1230200&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Wed Jan 11 19:24:48 2012
@@ -124,6 +124,23 @@ public class TestAggregateProtocol {
   }
 
   /**
+   * ****************** Test cases for Median **********************
+   */
+  /**
+   * @throws Throwable
+   */
+  @Test
+  public void testMedianWithValidRange() throws Throwable {
+    AggregationClient aClient = new AggregationClient(conf);
+    Scan scan = new Scan();
+    scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
+    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    long median = aClient.median(TEST_TABLE, ci,
+        scan);
+    assertEquals(8L, median);
+  }
+
+  /**
    * **************************** ROW COUNT Test cases *******************
    */