You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/08/02 20:03:26 UTC

svn commit: r1368625 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/mapreduce/ main/protobuf/ test/java/org/apache/hadoop/hbase/filter/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: tedyu
Date: Thu Aug  2 18:03:26 2012
New Revision: 1368625

URL: http://svn.apache.org/viewvc?rev=1368625&view=rev
Log:
HBASE-6468 RowCounter may return incorrect result if column name is specified in command line (Shrijeet)


Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
    hbase/trunk/hbase-server/src/main/protobuf/Filter.proto

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java Thu Aug  2 18:03:26 2012
@@ -60,6 +60,21 @@ public class FirstKeyOnlyFilter extends 
     return new FirstKeyOnlyFilter();
   }
 
+  /**
+   * @return true if first KV has been found.
+   */
+  protected boolean hasFoundKV() {
+    return this.foundKV;
+  }
+
+  /**
+   *
+   * @param value update {@link #foundKV} flag with value.
+   */
+  protected void setFoundKV(boolean value) {
+    this.foundKV = value;
+  }
+
   public void write(DataOutput out) throws IOException {
   }
 

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java Thu Aug  2 18:03:26 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.RowCounter;
+
+/**
+ * The filter looks for the given columns in KeyValue. Once there is a match for
+ * any one of the columns, it returns ReturnCode.NEXT_ROW for remaining
+ * KeyValues in the row.
+ * <p>
+ * Note : It may emit KVs which do not have the given columns in them, if
+ * these KVs happen to occur before a KV which does have a match. Given this
+ * caveat, this filter is only useful for special cases like {@link RowCounter}.
+ * <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FirstKeyValueMatchingQualifiersFilter extends FirstKeyOnlyFilter {
+
+  private Set<byte []> qualifiers;
+
+  /**
+   * This constructor should not be used.
+   */
+  public FirstKeyValueMatchingQualifiersFilter() {
+    qualifiers = Collections.emptySet();
+  }
+
+  /**
+   * Constructor which takes a set of columns. As soon as first KeyValue
+   * matching any of these columns is found, filter moves to next row.
+   * 
+   * @param qualifiers the set of columns to me matched.
+   */
+  public FirstKeyValueMatchingQualifiersFilter(Set<byte []> qualifiers) {
+    this.qualifiers = qualifiers;
+  }
+
+  public ReturnCode filterKeyValue(KeyValue v) {
+    if (hasFoundKV()) {
+      return ReturnCode.NEXT_ROW;
+    } else if (hasOneMatchingQualifier(v)) {
+      setFoundKV(true);
+    }
+    return ReturnCode.INCLUDE;
+  }
+
+  private boolean hasOneMatchingQualifier(KeyValue v) {
+    for (byte[] q : qualifiers) {
+      if (v.matchingQualifier(q)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Thu Aug  2 18:03:26 2012
@@ -20,15 +20,17 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
@@ -114,6 +116,7 @@ public class RowCounter {
     job.setJarByClass(RowCounter.class);
     Scan scan = new Scan();
     scan.setCacheBlocks(false);
+    Set<byte []> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
     if (startKey != null && !startKey.equals("")) {
       scan.setStartRow(Bytes.toBytes(startKey));
     }
@@ -127,10 +130,20 @@ public class RowCounter {
         if(fields.length == 1) {
           scan.addFamily(Bytes.toBytes(fields[0]));
         } else {
-          scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
+          byte[] qualifier = Bytes.toBytes(fields[1]);
+          qualifiers.add(qualifier);
+          scan.addColumn(Bytes.toBytes(fields[0]), qualifier);
         }
       }
     }
+    // specified column may or may not be part of first key value for the row.
+    // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use
+    // FirstKeyValueMatchingQualifiersFilter.
+    if (qualifiers.size() == 0) {
+      scan.setFilter(new FirstKeyOnlyFilter());
+    } else {
+      scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
+    }
     job.setOutputFormatClass(NullOutputFormat.class);
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
       RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);

Modified: hbase/trunk/hbase-server/src/main/protobuf/Filter.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Filter.proto?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Filter.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Filter.proto Thu Aug  2 18:03:26 2012
@@ -65,6 +65,9 @@ message FamilyFilter {
 message FirstKeyOnlyFilter {
 }
 
+message FirstKeyValueMatchingQualifiersFilter {
+}
+
 message InclusiveStopFilter {
   required bytes stopRowKey = 1;
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java Thu Aug  2 18:03:26 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.filter;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestFirstKeyValueMatchingQualifiersFilter extends TestCase {
+  private static final byte[] ROW = Bytes.toBytes("test");
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("test");
+  private static final byte[] COLUMN_QUALIFIER_1 = Bytes.toBytes("foo");
+  private static final byte[] COLUMN_QUALIFIER_2 = Bytes.toBytes("foo_2");
+  private static final byte[] COLUMN_QUALIFIER_3 = Bytes.toBytes("foo_3");
+  private static final byte[] VAL_1 = Bytes.toBytes("a");
+
+  /**
+   * Test the functionality of
+   * {@link FirstKeyValueMatchingQualifiersFilter#filterKeyValue(KeyValue)}
+   * 
+   * @throws Exception
+   */
+  public void testFirstKeyMatchingQualifierFilter() throws Exception {
+    Set<byte[]> quals = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    quals.add(COLUMN_QUALIFIER_1);
+    quals.add(COLUMN_QUALIFIER_2);
+    Filter filter = new FirstKeyValueMatchingQualifiersFilter(quals);
+
+    // Match in first attempt
+    KeyValue kv;
+    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1);
+    assertTrue("includeAndSetFlag",
+        filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+    assertTrue("flagIsSetSkipToNextRow",
+        filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
+
+    // A mismatch in first attempt and match in second attempt.
+    filter.reset();
+    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_3, VAL_1);
+    System.out.println(filter.filterKeyValue(kv));
+    assertTrue("includeFlagIsUnset",
+        filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+    assertTrue("includeAndSetFlag",
+        filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1);
+    assertTrue("flagIsSetSkipToNextRow",
+        filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
+  }
+
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java Thu Aug  2 18:03:26 2012
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the rowcounter map reduce job.
+ */
+@Category(MediumTests.class)
+public class TestRowCounter {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static String TABLE_NAME = "testRowCounter";
+  private final static String COL_FAM = "col_fam";
+  private final static String COL1 = "c1";
+  private final static String COL2 = "c2";
+  private final static int TOTAL_ROWS = 10;
+  private final static int ROWS_WITH_ONE_COL = 2;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+    TEST_UTIL.startMiniMapReduceCluster();
+    HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME),
+        Bytes.toBytes(COL_FAM));
+    writeRows(table);
+    table.close();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+  }
+
+  /**
+   * Test a case when no column was specified in command line arguments.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRowCounterNoColumn() throws Exception {
+    String[] args = new String[] {
+        TABLE_NAME
+    };
+    runRowCount(args, 10);
+  }
+
+  /**
+   * Test a case when the column specified in command line arguments is
+   * exclusive for few rows.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRowCounterExclusiveColumn() throws Exception {
+    String[] args = new String[] {
+        TABLE_NAME, COL_FAM + ":" + COL1
+    };
+    runRowCount(args, 8);
+  }
+
+  /**
+   * Test a case when the column specified in command line arguments is not part
+   * of first KV for a row.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRowCounterHiddenColumn() throws Exception {
+    String[] args = new String[] {
+        TABLE_NAME, COL_FAM + ":" + COL2
+    };
+    runRowCount(args, 10);
+  }
+
+  /**
+   * Run the RowCounter map reduce job and verify the row count.
+   * 
+   * @param args the command line arguments to be used for rowcounter job.
+   * @param expectedCount the expected row count (result of map reduce job).
+   * @throws Exception
+   */
+  private void runRowCount(String[] args, int expectedCount) throws Exception {
+    GenericOptionsParser opts = new GenericOptionsParser(
+        TEST_UTIL.getConfiguration(), args);
+    Configuration conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+    Job job = RowCounter.createSubmittableJob(conf, args);
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    Counter counter = job.getCounters().findCounter(
+        RowCounterMapper.Counters.ROWS);
+    assertEquals(expectedCount, counter.getValue());
+  }
+
+  /**
+   * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
+   * two columns, Few have one.
+   * 
+   * @param table
+   * @throws IOException
+   */
+  private static void writeRows(HTable table) throws IOException {
+    final byte[] family = Bytes.toBytes(COL_FAM);
+    final byte[] value = Bytes.toBytes("abcd");
+    final byte[] col1 = Bytes.toBytes(COL1);
+    final byte[] col2 = Bytes.toBytes(COL2);
+    ArrayList<Put> rowsUpdate = new ArrayList<Put>();
+    // write few rows with two columns
+    int i = 0;
+    for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put put = new Put(row);
+      put.add(family, col1, value);
+      put.add(family, col2, value);
+      rowsUpdate.add(put);
+    }
+
+    // write few rows with only one column
+    for (; i < TOTAL_ROWS; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put put = new Put(row);
+      put.add(family, col2, value);
+      rowsUpdate.add(put);
+    }
+    table.put(rowsUpdate);
+  }
+}