You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/08/02 20:03:26 UTC
svn commit: r1368625 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/filter/
main/java/org/apache/hadoop/hbase/mapreduce/ main/protobuf/
test/java/org/apache/hadoop/hbase/filter/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: tedyu
Date: Thu Aug 2 18:03:26 2012
New Revision: 1368625
URL: http://svn.apache.org/viewvc?rev=1368625&view=rev
Log:
HBASE-6468 RowCounter may return incorrect result if column name is specified in command line (Shrijeet)
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
hbase/trunk/hbase-server/src/main/protobuf/Filter.proto
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java Thu Aug 2 18:03:26 2012
@@ -60,6 +60,21 @@ public class FirstKeyOnlyFilter extends
return new FirstKeyOnlyFilter();
}
+ /**
+ * @return true if first KV has been found.
+ */
+ protected boolean hasFoundKV() {
+ return this.foundKV;
+ }
+
+ /**
+ *
+ * @param value update {@link #foundKV} flag with value.
+ */
+ protected void setFoundKV(boolean value) {
+ this.foundKV = value;
+ }
+
public void write(DataOutput out) throws IOException {
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java Thu Aug 2 18:03:26 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.RowCounter;
+
+/**
+ * The filter looks for the given columns in KeyValue. Once there is a match for
+ * any one of the columns, it returns ReturnCode.NEXT_ROW for remaining
+ * KeyValues in the row.
+ * <p>
+ * Note : It may emit KVs which do not have the given columns in them, if
+ * these KVs happen to occur before a KV which does have a match. Given this
+ * caveat, this filter is only useful for special cases like {@link RowCounter}.
+ * <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FirstKeyValueMatchingQualifiersFilter extends FirstKeyOnlyFilter {
+
+ private Set<byte []> qualifiers;
+
+ /**
+ * This constructor should not be used.
+ */
+ public FirstKeyValueMatchingQualifiersFilter() {
+ qualifiers = Collections.emptySet();
+ }
+
+ /**
+ * Constructor which takes a set of columns. As soon as first KeyValue
+ * matching any of these columns is found, filter moves to next row.
+ *
+ * @param qualifiers the set of columns to me matched.
+ */
+ public FirstKeyValueMatchingQualifiersFilter(Set<byte []> qualifiers) {
+ this.qualifiers = qualifiers;
+ }
+
+ public ReturnCode filterKeyValue(KeyValue v) {
+ if (hasFoundKV()) {
+ return ReturnCode.NEXT_ROW;
+ } else if (hasOneMatchingQualifier(v)) {
+ setFoundKV(true);
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ private boolean hasOneMatchingQualifier(KeyValue v) {
+ for (byte[] q : qualifiers) {
+ if (v.matchingQualifier(q)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Thu Aug 2 18:03:26 2012
@@ -20,15 +20,17 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@@ -114,6 +116,7 @@ public class RowCounter {
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
scan.setCacheBlocks(false);
+ Set<byte []> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
if (startKey != null && !startKey.equals("")) {
scan.setStartRow(Bytes.toBytes(startKey));
}
@@ -127,10 +130,20 @@ public class RowCounter {
if(fields.length == 1) {
scan.addFamily(Bytes.toBytes(fields[0]));
} else {
- scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
+ byte[] qualifier = Bytes.toBytes(fields[1]);
+ qualifiers.add(qualifier);
+ scan.addColumn(Bytes.toBytes(fields[0]), qualifier);
}
}
}
+ // specified column may or may not be part of first key value for the row.
+ // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use
+ // FirstKeyValueMatchingQualifiersFilter.
+ if (qualifiers.size() == 0) {
+ scan.setFilter(new FirstKeyOnlyFilter());
+ } else {
+ scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
+ }
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
Modified: hbase/trunk/hbase-server/src/main/protobuf/Filter.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Filter.proto?rev=1368625&r1=1368624&r2=1368625&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Filter.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Filter.proto Thu Aug 2 18:03:26 2012
@@ -65,6 +65,9 @@ message FamilyFilter {
message FirstKeyOnlyFilter {
}
+message FirstKeyValueMatchingQualifiersFilter {
+}
+
message InclusiveStopFilter {
required bytes stopRowKey = 1;
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java Thu Aug 2 18:03:26 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.filter;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestFirstKeyValueMatchingQualifiersFilter extends TestCase {
+ private static final byte[] ROW = Bytes.toBytes("test");
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("test");
+ private static final byte[] COLUMN_QUALIFIER_1 = Bytes.toBytes("foo");
+ private static final byte[] COLUMN_QUALIFIER_2 = Bytes.toBytes("foo_2");
+ private static final byte[] COLUMN_QUALIFIER_3 = Bytes.toBytes("foo_3");
+ private static final byte[] VAL_1 = Bytes.toBytes("a");
+
+ /**
+ * Test the functionality of
+ * {@link FirstKeyValueMatchingQualifiersFilter#filterKeyValue(KeyValue)}
+ *
+ * @throws Exception
+ */
+ public void testFirstKeyMatchingQualifierFilter() throws Exception {
+ Set<byte[]> quals = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ quals.add(COLUMN_QUALIFIER_1);
+ quals.add(COLUMN_QUALIFIER_2);
+ Filter filter = new FirstKeyValueMatchingQualifiersFilter(quals);
+
+ // Match in first attempt
+ KeyValue kv;
+ kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1);
+ assertTrue("includeAndSetFlag",
+ filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+ assertTrue("flagIsSetSkipToNextRow",
+ filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
+
+ // A mismatch in first attempt and match in second attempt.
+ filter.reset();
+ kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_3, VAL_1);
+ System.out.println(filter.filterKeyValue(kv));
+ assertTrue("includeFlagIsUnset",
+ filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+ assertTrue("includeAndSetFlag",
+ filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1);
+ assertTrue("flagIsSetSkipToNextRow",
+ filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java?rev=1368625&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java Thu Aug 2 18:03:26 2012
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the rowcounter map reduce job.
+ */
+@Category(MediumTests.class)
+public class TestRowCounter {
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String TABLE_NAME = "testRowCounter";
+ private final static String COL_FAM = "col_fam";
+ private final static String COL1 = "c1";
+ private final static String COL2 = "c2";
+ private final static int TOTAL_ROWS = 10;
+ private final static int ROWS_WITH_ONE_COL = 2;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL.startMiniMapReduceCluster();
+ HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME),
+ Bytes.toBytes(COL_FAM));
+ writeRows(table);
+ table.close();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ }
+
+ /**
+ * Test a case when no column was specified in command line arguments.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterNoColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME
+ };
+ runRowCount(args, 10);
+ }
+
+ /**
+ * Test a case when the column specified in command line arguments is
+ * exclusive for few rows.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterExclusiveColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, COL_FAM + ":" + COL1
+ };
+ runRowCount(args, 8);
+ }
+
+ /**
+ * Test a case when the column specified in command line arguments is not part
+ * of first KV for a row.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterHiddenColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, COL_FAM + ":" + COL2
+ };
+ runRowCount(args, 10);
+ }
+
+ /**
+ * Run the RowCounter map reduce job and verify the row count.
+ *
+ * @param args the command line arguments to be used for rowcounter job.
+ * @param expectedCount the expected row count (result of map reduce job).
+ * @throws Exception
+ */
+ private void runRowCount(String[] args, int expectedCount) throws Exception {
+ GenericOptionsParser opts = new GenericOptionsParser(
+ TEST_UTIL.getConfiguration(), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+ Job job = RowCounter.createSubmittableJob(conf, args);
+ job.waitForCompletion(true);
+ assertTrue(job.isSuccessful());
+ Counter counter = job.getCounters().findCounter(
+ RowCounterMapper.Counters.ROWS);
+ assertEquals(expectedCount, counter.getValue());
+ }
+
+ /**
+ * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
+ * two columns, Few have one.
+ *
+ * @param table
+ * @throws IOException
+ */
+ private static void writeRows(HTable table) throws IOException {
+ final byte[] family = Bytes.toBytes(COL_FAM);
+ final byte[] value = Bytes.toBytes("abcd");
+ final byte[] col1 = Bytes.toBytes(COL1);
+ final byte[] col2 = Bytes.toBytes(COL2);
+ ArrayList<Put> rowsUpdate = new ArrayList<Put>();
+ // write few rows with two columns
+ int i = 0;
+ for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Put put = new Put(row);
+ put.add(family, col1, value);
+ put.add(family, col2, value);
+ rowsUpdate.add(put);
+ }
+
+ // write few rows with only one column
+ for (; i < TOTAL_ROWS; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Put put = new Put(row);
+ put.add(family, col2, value);
+ rowsUpdate.add(put);
+ }
+ table.put(rowsUpdate);
+ }
+}