You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2013/10/24 20:34:28 UTC
svn commit: r1535487 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: ramkrishna
Date: Thu Oct 24 18:34:27 2013
New Revision: 1535487
URL: http://svn.apache.org/r1535487
Log:
HBASE-9767 - Support OperationAttributes in ImportTSV Parser (Ram)
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Thu Oct 24 18:34:27 2013
@@ -82,8 +82,10 @@ public class ImportTsv extends Configure
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
-
+ public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
final static String DEFAULT_SEPARATOR = "\t";
+ final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
+ final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
public static class TsvParser {
@@ -108,11 +110,20 @@ public class ImportTsv extends Configure
public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
+ public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
+
+ private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+
+ public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+ * @param tagSeperatorStr
*/
- public TsvParser(String columnsSpecification, String separatorStr) {
+ public TsvParser(String columnsSpecification, String seperatorStr) {
+ this(columnsSpecification, seperatorStr, null);
+ }
+ public TsvParser(String columnsSpecification, String separatorStr, String tagSeperatorStr) {
// Configure separator
byte[] separator = Bytes.toBytes(separatorStr);
Preconditions.checkArgument(separator.length == 1,
@@ -133,12 +144,14 @@ public class ImportTsv extends Configure
rowKeyColumnIndex = i;
continue;
}
-
if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
timestampKeyColumnIndex = i;
continue;
}
-
+ if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+ attrKeyColumnIndex = i;
+ continue;
+ }
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -158,6 +171,13 @@ public class ImportTsv extends Configure
return timestampKeyColumnIndex;
}
+ public boolean hasAttributes() {
+ return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+
+ public int getAttributesKeyColumnIndex() {
+ return attrKeyColumnIndex;
+ }
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
@@ -190,6 +210,8 @@ public class ImportTsv extends Configure
} else if (hasTimestamp()
&& tabOffsets.size() <= getTimestampKeyColumnIndex()) {
throw new BadTsvLineException("No timestamp");
+ } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
+ throw new BadTsvLineException("No attributes specified");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -226,6 +248,41 @@ public class ImportTsv extends Configure
throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
}
}
+
+ private String getAttributes() {
+ if (!hasAttributes()) {
+ return null;
+ } else {
+ return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
+ getColumnLength(attrKeyColumnIndex));
+ }
+ }
+
+ public String[] getIndividualAttributes() {
+ String attributes = getAttributes();
+ if (attributes != null) {
+ return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
+ } else {
+ return null;
+ }
+ }
+
+ public int getAttributeKeyOffset() {
+ if (hasAttributes()) {
+ return getColumnOffset(attrKeyColumnIndex);
+ } else {
+ return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+ }
+
+ public int getAttributeKeyLength() {
+ if (hasAttributes()) {
+ return getColumnLength(attrKeyColumnIndex);
+ } else {
+ return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+ }
+
public int getColumnOffset(int idx) {
if (idx > 0)
@@ -398,6 +455,9 @@ public class ImportTsv extends Configure
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
"Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
"\n" +
+ TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
+ " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
+ " as the seperator. Note that more than one OperationAttributes can be specified.\n"+
"By default importtsv will load data directly into HBase. To instead generate\n" +
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
@@ -460,10 +520,21 @@ public class ImportTsv extends Configure
+ TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
return -1;
}
+
+ int attrKeysFound = 0;
+ for (String col : columns) {
+ if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
+ attrKeysFound++;
+ }
+ if (attrKeysFound > 1) {
+ usage("Must specify at most one column as "
+ + TsvParser.ATTRIBUTES_COLUMN_SPEC);
+ return -1;
+ }
// Make sure one or more columns are specified excluding rowkey and
// timestamp key
- if (columns.length - (rowkeysFound + tskeysFound) < 1) {
+ if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
usage("One or more columns in addition to the row key and timestamp(optional) are required");
return -1;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java Thu Oct 24 18:34:27 2013
@@ -17,19 +17,20 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Counter;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
/**
* Write table content out to files in hdfs.
@@ -41,7 +42,7 @@ extends Mapper<LongWritable, Text, Immut
{
/** Timestamp for all inserted rows */
- private long ts;
+ protected long ts;
/** Column seperator */
private String separator;
@@ -50,7 +51,9 @@ extends Mapper<LongWritable, Text, Immut
private boolean skipBadLines;
private Counter badLineCount;
- private ImportTsv.TsvParser parser;
+ protected ImportTsv.TsvParser parser;
+
+ protected Configuration conf;
public long getTs() {
return ts;
@@ -80,8 +83,7 @@ extends Mapper<LongWritable, Text, Immut
protected void setup(Context context) {
doSetup(context);
- Configuration conf = context.getConfiguration();
-
+ conf = context.getConfiguration();
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
separator);
if (parser.getRowKeyColumnIndex() == -1) {
@@ -104,7 +106,6 @@ extends Mapper<LongWritable, Text, Immut
} else {
separator = new String(Base64.decode(separator));
}
-
// Should never get 0 as we are setting this to a valid value in job
// configuration.
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
@@ -135,18 +136,11 @@ extends Mapper<LongWritable, Text, Immut
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex()
- || i == parser.getTimestampKeyColumnIndex()) {
+ if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
+ || i == parser.getAttributesKeyColumnIndex()) {
continue;
}
- KeyValue kv = new KeyValue(
- lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
- parser.getFamily(i), 0, parser.getFamily(i).length,
- parser.getQualifier(i), 0, parser.getQualifier(i).length,
- ts,
- KeyValue.Type.Put,
- lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
- put.add(kv);
+ KeyValue kv = createPuts(lineBytes, parsed, put, i);
}
context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
@@ -173,4 +167,15 @@ extends Mapper<LongWritable, Text, Immut
e.printStackTrace();
}
}
+
+ protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
+ int i) throws BadTsvLineException, IOException {
+ KeyValue kv;
+ kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+ parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+ parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+ parsed.getColumnLength(i));
+ put.add(kv);
+ return kv;
+ }
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java?rev=1535487&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java Thu Oct 24 18:34:27 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithOperationAttributes implements Configurable {
+
+ protected static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ /**
+ * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+ * false.
+ */
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private static Configuration conf;
+
+ private static final String TEST_ATR_KEY = "test";
+
+ private final String FAMILY = "FAM";
+
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ conf = util.getConfiguration();
+ conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName());
+ conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName());
+ util.startMiniCluster();
+ HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+ util.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMROnTable() throws Exception {
+ String tableName = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+ String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n";
+ util.createTable(tableName, FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1, true);
+ util.deleteTable(tableName);
+ }
+
+ @Test
+ public void testMROnTableWithInvalidOperationAttr() throws Exception {
+ String tableName = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+ String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n";
+ util.createTable(tableName, FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1, false);
+ util.deleteTable(tableName);
+ }
+
+ /**
+ * Run an ImportTsv job and perform basic validation on the results. Returns
+ * the ImportTsv <code>Tool</code> instance so that other tests can inspect it
+ * for further validation as necessary. This method is static to insure
+ * non-reliance on instance's util/conf facilities.
+ *
+ * @param args
+ * Any arguments to pass BEFORE inputFile path is appended.
+ * @param dataAvailable
+ * @return The Tool instance used to run the test.
+ */
+ private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args,
+ int valueMultiplier, boolean dataAvailable) throws Exception {
+ String table = args[args.length - 1];
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("min.num.spills.for.combine", 1);
+ }
+
+ // run the import
+ List<String> argv = new ArrayList<String>(Arrays.asList(args));
+ argv.add(inputPath.toString());
+ Tool tool = new ImportTsv();
+ LOG.debug("Running ImportTsv with arguments: " + argv);
+ assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+
+ validateTable(conf, table, family, valueMultiplier, dataAvailable);
+
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table);
+ }
+ return tool;
+ }
+
+ /**
+ * Confirm ImportTsv via data in online table.
+ *
+ * @param dataAvailable
+ */
+ private static void validateTable(Configuration conf, String tableName, String family,
+ int valueMultiplier, boolean dataAvailable) throws IOException {
+
+ LOG.debug("Validating table.");
+ HTable table = new HTable(conf, tableName);
+ boolean verified = false;
+ long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ Scan scan = new Scan();
+ // Scan entire family.
+ scan.addFamily(Bytes.toBytes(family));
+ if (dataAvailable) {
+ ResultScanner resScanner = table.getScanner(scan);
+ for (Result res : resScanner) {
+ LOG.debug("Getting results " + res.size());
+ assertTrue(res.size() == 2);
+ List<Cell> kvs = res.listCells();
+ assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
+ assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
+ assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
+ assertTrue(CellUtil.matchingValue(kvs.get(1),
+ Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
+ // Only one result set is expected, so let it loop.
+ verified = true;
+ }
+ } else {
+ ResultScanner resScanner = table.getScanner(scan);
+ Result[] next = resScanner.next(2);
+ assertEquals(0, next.length);
+ verified = true;
+ }
+
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ table.close();
+ assertTrue(verified);
+ }
+
+ public static class OperationAttributesTestController extends BaseRegionObserver {
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+ Durability durability) throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ if (!region.getRegionInfo().isMetaTable()
+ && !region.getRegionInfo().getTable().isSystemTable()) {
+ if (put.getAttribute(TEST_ATR_KEY) != null) {
+ LOG.debug("allow any put to happen " + region.getRegionNameAsString());
+ } else {
+ e.bypass();
+ }
+ }
+ }
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java Thu Oct 24 18:34:27 2013
@@ -54,7 +54,7 @@ public class TestImportTsvParser {
ArrayList<String> parsedCols = new ArrayList<String>();
for (int i = 0; i < parsed.getColumnCount(); i++) {
parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
- parsed.getColumnLength(i)));
+ parsed.getColumnLength(i)));
}
if (!Iterables.elementsEqual(parsedCols, expected)) {
fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
@@ -100,6 +100,32 @@ public class TestImportTsvParser {
assertEquals(0, parser.getRowKeyColumnIndex());
assertTrue(parser.hasTimestamp());
assertEquals(2, parser.getTimestampKeyColumnIndex());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY",
+ "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ assertEquals(4, parser.getAttributesKeyColumnIndex());
+
+ parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY",
+ "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(4, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ assertEquals(0, parser.getAttributesKeyColumnIndex());
}
@Test
@@ -113,8 +139,7 @@ public class TestImportTsvParser {
assertNull(parser.getQualifier(2));
assertEquals(2, parser.getRowKeyColumnIndex());
- assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
- parser.getTimestampKeyColumnIndex());
+ assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
ParsedLine parsed = parser.parse(line, line.length);
@@ -187,14 +212,13 @@ public class TestImportTsvParser {
byte[] line = Bytes.toBytes("rowkey\tval_a");
parser.parse(line, line.length);
}
-
+
@Test
public void testTsvParserParseRowKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
assertEquals(0, parser.getRowKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\tval_a\t1234");
- Pair<Integer, Integer> rowKeyOffsets = parser
- .parseRowKey(line, line.length);
+ Pair<Integer, Integer> rowKeyOffsets = parser.parseRowKey(line, line.length);
assertEquals(0, rowKeyOffsets.getFirst().intValue());
assertEquals(5, rowKeyOffsets.getSecond().intValue());
try {
@@ -225,4 +249,50 @@ public class TestImportTsvParser {
assertEquals(16, rowKeyOffsets.getSecond().intValue());
}
+ @Test
+ public void testTsvParseAttributesKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t");
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value");
+ ParsedLine parse = parser.parse(line, line.length);
+ assertEquals(18, parse.getAttributeKeyOffset());
+ assertEquals(3, parser.getAttributesKeyColumnIndex());
+ String attributes[] = parse.getIndividualAttributes();
+ assertEquals(attributes[0], "key=>value");
+ try {
+ line = Bytes.toBytes("rowkey\tval_a\t1234");
+ parser.parse(line, line.length);
+ fail("Should get BadTsvLineException on empty rowkey.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
+ assertEquals(2, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234");
+ parse = parser.parse(line, line.length);
+ assertEquals(0, parse.getAttributeKeyOffset());
+ assertEquals(0, parser.getAttributesKeyColumnIndex());
+ attributes = parse.getIndividualAttributes();
+ assertEquals(attributes[0], "key=>value");
+ try {
+ line = Bytes.toBytes("val_a");
+ ParsedLine parse2 = parser.parse(line, line.length);
+ fail("Should get BadTsvLineException when number of columns less than rowkey position.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
+ assertEquals(3, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey");
+ parse = parser.parse(line, line.length);
+ assertEquals(1, parser.getAttributesKeyColumnIndex());
+ assertEquals(6, parse.getAttributeKeyOffset());
+ String[] attr = parse.getIndividualAttributes();
+ int i = 0;
+ for(String str : attr) {
+ assertEquals(("key"+i+"=>"+"value"+i), str );
+ i++;
+ }
+ }
+
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java?rev=1535487&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java Thu Oct 24 18:34:27 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ * Just shows a simple example of how the attributes can be extracted and added
+ * to the puts
+ */
+public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
+ @Override
+ protected KeyValue createPuts(byte[] lineBytes, ParsedLine parsed, Put put, int i)
+ throws BadTsvLineException, IOException {
+ KeyValue kv;
+ kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+ parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+ parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+ parsed.getColumnLength(i));
+ if (parsed.getIndividualAttributes() != null) {
+ String[] attributes = parsed.getIndividualAttributes();
+ for (String attr : attributes) {
+ String[] split = attr.split(ImportTsv.DEFAULT_ATTRIBUTES_SEPERATOR);
+ if (split == null || split.length <= 1) {
+ throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+ } else {
+ if (split[0].length() <= 0 || split[1].length() <= 0) {
+ throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+ }
+ put.setAttribute(split[0], Bytes.toBytes(split[1]));
+ }
+ }
+ }
+ put.add(kv);
+ return kv;
+ }
+}