You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2013/10/24 20:34:28 UTC

svn commit: r1535487 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: ramkrishna
Date: Thu Oct 24 18:34:27 2013
New Revision: 1535487

URL: http://svn.apache.org/r1535487
Log:
HBASE-9767 - Support OperationAttributes in ImportTSV Parser (Ram)


Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Thu Oct 24 18:34:27 2013
@@ -82,8 +82,10 @@ public class ImportTsv extends Configure
   public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
   public final static String COLUMNS_CONF_KEY = "importtsv.columns";
   public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
-
+  public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
   final static String DEFAULT_SEPARATOR = "\t";
+  final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
+  final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
   final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
 
   public static class TsvParser {
@@ -108,11 +110,20 @@ public class ImportTsv extends Configure
 
     public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
 
+    public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
+
+    private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+
+    public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
     /**
      * @param columnsSpecification the list of columns to parser out, comma separated.
      * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+     * @param tagSeperatorStr 
      */
-    public TsvParser(String columnsSpecification, String separatorStr) {
+    public TsvParser(String columnsSpecification, String seperatorStr) {
+      this(columnsSpecification, seperatorStr, null);
+    }
+    public TsvParser(String columnsSpecification, String separatorStr, String tagSeperatorStr) {
       // Configure separator
       byte[] separator = Bytes.toBytes(separatorStr);
       Preconditions.checkArgument(separator.length == 1,
@@ -133,12 +144,14 @@ public class ImportTsv extends Configure
           rowKeyColumnIndex = i;
           continue;
         }
-        
         if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
           timestampKeyColumnIndex = i;
           continue;
         }
-        
+        if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+          attrKeyColumnIndex = i;
+          continue;
+        }
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -158,6 +171,13 @@ public class ImportTsv extends Configure
       return timestampKeyColumnIndex;
     }
 
+    public boolean hasAttributes() {
+      return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+    }
+
+    public int getAttributesKeyColumnIndex() {
+      return attrKeyColumnIndex;
+    }
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
@@ -190,6 +210,8 @@ public class ImportTsv extends Configure
       } else if (hasTimestamp()
           && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
         throw new BadTsvLineException("No timestamp");
+      } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
+        throw new BadTsvLineException("No attributes specified");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -226,6 +248,41 @@ public class ImportTsv extends Configure
           throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
         }
       }
+
+      private String getAttributes() {
+        if (!hasAttributes()) {
+          return null;
+        } else {
+          return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
+              getColumnLength(attrKeyColumnIndex));
+        }
+      }
+      
+      public String[] getIndividualAttributes() {
+        String attributes = getAttributes();
+        if (attributes != null) {
+          return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
+        } else {
+          return null;
+        }
+      }
+       
+      public int getAttributeKeyOffset() {
+        if (hasAttributes()) {
+          return getColumnOffset(attrKeyColumnIndex);
+        } else {
+          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+        }
+      }
+
+      public int getAttributeKeyLength() {
+        if (hasAttributes()) {
+          return getColumnLength(attrKeyColumnIndex);
+        } else {
+          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+        }
+      }
+      
       
       public int getColumnOffset(int idx) {
         if (idx > 0)
@@ -398,6 +455,9 @@ public class ImportTsv extends Configure
       "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
       "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
       "\n" +
+      TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
+      " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
+      " as the seperator.  Note that more than one OperationAttributes can be specified.\n"+
       "By default importtsv will load data directly into HBase. To instead generate\n" +
       "HFiles of data to prepare for a bulk data load, pass the option:\n" +
       "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
@@ -460,10 +520,21 @@ public class ImportTsv extends Configure
             + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
         return -1;
       }
+      
+      int attrKeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
+          attrKeysFound++;
+      }
+      if (attrKeysFound > 1) {
+        usage("Must specify at most one column as "
+            + TsvParser.ATTRIBUTES_COLUMN_SPEC);
+        return -1;
+      }
     
       // Make sure one or more columns are specified excluding rowkey and
       // timestamp key
-      if (columns.length - (rowkeysFound + tskeysFound) < 1) {
+      if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
         usage("One or more columns in addition to the row key and timestamp(optional) are required");
         return -1;
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java Thu Oct 24 18:34:27 2013
@@ -17,19 +17,20 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Counter;
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
 
 /**
  * Write table content out to files in hdfs.
@@ -41,7 +42,7 @@ extends Mapper<LongWritable, Text, Immut
 {
 
   /** Timestamp for all inserted rows */
-  private long ts;
+  protected long ts;
 
   /** Column seperator */
   private String separator;
@@ -50,7 +51,9 @@ extends Mapper<LongWritable, Text, Immut
   private boolean skipBadLines;
   private Counter badLineCount;
 
-  private ImportTsv.TsvParser parser;
+  protected ImportTsv.TsvParser parser;
+
+  protected Configuration conf;
 
   public long getTs() {
     return ts;
@@ -80,8 +83,7 @@ extends Mapper<LongWritable, Text, Immut
   protected void setup(Context context) {
     doSetup(context);
 
-    Configuration conf = context.getConfiguration();
-
+    conf = context.getConfiguration();
     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
                            separator);
     if (parser.getRowKeyColumnIndex() == -1) {
@@ -104,7 +106,6 @@ extends Mapper<LongWritable, Text, Immut
     } else {
       separator = new String(Base64.decode(separator));
     }
-
     // Should never get 0 as we are setting this to a valid value in job
     // configuration.
     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
@@ -135,18 +136,11 @@ extends Mapper<LongWritable, Text, Immut
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
-        if (i == parser.getRowKeyColumnIndex()
-            || i == parser.getTimestampKeyColumnIndex()) {
+        if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
+            || i == parser.getAttributesKeyColumnIndex()) {
           continue;
         }
-        KeyValue kv = new KeyValue(
-            lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
-            parser.getFamily(i), 0, parser.getFamily(i).length,
-            parser.getQualifier(i), 0, parser.getQualifier(i).length,
-            ts,
-            KeyValue.Type.Put,
-            lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
-        put.add(kv);
+        KeyValue kv = createPuts(lineBytes, parsed, put, i);
       }
       context.write(rowKey, put);
     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
@@ -173,4 +167,15 @@ extends Mapper<LongWritable, Text, Immut
       e.printStackTrace();
     }
   }
+
+  protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
+      int i) throws BadTsvLineException, IOException {
+    KeyValue kv;
+    kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+        parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+        parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+        parsed.getColumnLength(i));
+    put.add(kv);
+    return kv;
+  }
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java?rev=1535487&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java Thu Oct 24 18:34:27 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithOperationAttributes implements Configurable {
+
+  protected static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private static Configuration conf;
+
+  private static final String TEST_ATR_KEY = "test";
+
+  private final String FAMILY = "FAM";
+
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName());
+    conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName());
+    util.startMiniCluster();
+    HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+    util.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniMapReduceCluster();
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    String tableName = "test-" + UUID.randomUUID();
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, true);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testMROnTableWithInvalidOperationAttr() throws Exception {
+    String tableName = "test-" + UUID.randomUUID();
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, false);
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Run an ImportTsv job and perform basic validation on the results. Returns
+   * the ImportTsv <code>Tool</code> instance so that other tests can inspect it
+   * for further validation as necessary. This method is static to insure
+   * non-reliance on instance's util/conf facilities.
+   * 
+   * @param args
+   *          Any arguments to pass BEFORE inputFile path is appended.
+   * @param dataAvailable
+   * @return The Tool instance used to run the test.
+   */
+  private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args,
+      int valueMultiplier, boolean dataAvailable) throws Exception {
+    String table = args[args.length - 1];
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("min.num.spills.for.combine", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<String>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+
+    validateTable(conf, table, family, valueMultiplier, dataAvailable);
+
+    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+      LOG.debug("Deleting test subdirectory");
+      util.cleanupDataTestDirOnTestFS(table);
+    }
+    return tool;
+  }
+
+  /**
+   * Confirm ImportTsv via data in online table.
+   * 
+   * @param dataAvailable
+   */
+  private static void validateTable(Configuration conf, String tableName, String family,
+      int valueMultiplier, boolean dataAvailable) throws IOException {
+
+    LOG.debug("Validating table.");
+    HTable table = new HTable(conf, tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        Scan scan = new Scan();
+        // Scan entire family.
+        scan.addFamily(Bytes.toBytes(family));
+        if (dataAvailable) {
+          ResultScanner resScanner = table.getScanner(scan);
+          for (Result res : resScanner) {
+            LOG.debug("Getting results " + res.size());
+            assertTrue(res.size() == 2);
+            List<Cell> kvs = res.listCells();
+            assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
+            assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
+            assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
+            assertTrue(CellUtil.matchingValue(kvs.get(1),
+                Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
+            // Only one result set is expected, so let it loop.
+            verified = true;
+          }
+        } else {
+          ResultScanner resScanner = table.getScanner(scan);
+          Result[] next = resScanner.next(2);
+          assertEquals(0, next.length);
+          verified = true;
+        }
+
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    table.close();
+    assertTrue(verified);
+  }
+
+  public static class OperationAttributesTestController extends BaseRegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+        Durability durability) throws IOException {
+      HRegion region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        if (put.getAttribute(TEST_ATR_KEY) != null) {
+          LOG.debug("allow any put to happen " + region.getRegionNameAsString());
+        } else {
+          e.bypass();
+        }
+      }
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java?rev=1535487&r1=1535486&r2=1535487&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java Thu Oct 24 18:34:27 2013
@@ -54,7 +54,7 @@ public class TestImportTsvParser {
     ArrayList<String> parsedCols = new ArrayList<String>();
     for (int i = 0; i < parsed.getColumnCount(); i++) {
       parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
-        parsed.getColumnLength(i)));
+          parsed.getColumnLength(i)));
     }
     if (!Iterables.elementsEqual(parsedCols, expected)) {
       fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
@@ -100,6 +100,32 @@ public class TestImportTsvParser {
     assertEquals(0, parser.getRowKeyColumnIndex());
     assertTrue(parser.hasTimestamp());
     assertEquals(2, parser.getTimestampKeyColumnIndex());
+
+    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY",
+        "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertTrue(parser.hasTimestamp());
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
+    assertEquals(4, parser.getAttributesKeyColumnIndex());
+
+    parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY",
+        "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+    assertEquals(4, parser.getRowKeyColumnIndex());
+    assertTrue(parser.hasTimestamp());
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
+    assertEquals(0, parser.getAttributesKeyColumnIndex());
   }
 
   @Test
@@ -113,8 +139,7 @@ public class TestImportTsvParser {
     assertNull(parser.getQualifier(2));
     assertEquals(2, parser.getRowKeyColumnIndex());
 
-    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
-      parser.getTimestampKeyColumnIndex());
+    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex());
 
     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
     ParsedLine parsed = parser.parse(line, line.length);
@@ -187,14 +212,13 @@ public class TestImportTsvParser {
     byte[] line = Bytes.toBytes("rowkey\tval_a");
     parser.parse(line, line.length);
   }
-  
+
   @Test
   public void testTsvParserParseRowKey() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
     assertEquals(0, parser.getRowKeyColumnIndex());
     byte[] line = Bytes.toBytes("rowkey\tval_a\t1234");
-    Pair<Integer, Integer> rowKeyOffsets = parser
-        .parseRowKey(line, line.length);
+    Pair<Integer, Integer> rowKeyOffsets = parser.parseRowKey(line, line.length);
     assertEquals(0, rowKeyOffsets.getFirst().intValue());
     assertEquals(5, rowKeyOffsets.getSecond().intValue());
     try {
@@ -225,4 +249,50 @@ public class TestImportTsvParser {
     assertEquals(16, rowKeyOffsets.getSecond().intValue());
   }
 
+  @Test
+  public void testTsvParseAttributesKey() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t");
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value");
+    ParsedLine parse = parser.parse(line, line.length);
+    assertEquals(18, parse.getAttributeKeyOffset());
+    assertEquals(3, parser.getAttributesKeyColumnIndex());
+    String attributes[] = parse.getIndividualAttributes();
+    assertEquals(attributes[0], "key=>value");
+    try {
+      line = Bytes.toBytes("rowkey\tval_a\t1234");
+      parser.parse(line, line.length);
+      fail("Should get BadTsvLineException on empty rowkey.");
+    } catch (BadTsvLineException b) {
+
+    }
+    parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
+    assertEquals(2, parser.getRowKeyColumnIndex());
+    line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234");
+    parse = parser.parse(line, line.length);
+    assertEquals(0, parse.getAttributeKeyOffset());
+    assertEquals(0, parser.getAttributesKeyColumnIndex());
+    attributes = parse.getIndividualAttributes();
+    assertEquals(attributes[0], "key=>value");
+    try {
+      line = Bytes.toBytes("val_a");
+      ParsedLine parse2 = parser.parse(line, line.length);
+      fail("Should get BadTsvLineException when number of columns less than rowkey position.");
+    } catch (BadTsvLineException b) {
+
+    }
+    parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
+    assertEquals(3, parser.getRowKeyColumnIndex());
+    line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey");
+    parse = parser.parse(line, line.length);
+    assertEquals(1, parser.getAttributesKeyColumnIndex());
+    assertEquals(6, parse.getAttributeKeyOffset());
+    String[] attr = parse.getIndividualAttributes();
+    int i = 0;
+    for(String str :  attr) {
+      assertEquals(("key"+i+"=>"+"value"+i), str );
+      i++;
+    }
+  }
+
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java?rev=1535487&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java Thu Oct 24 18:34:27 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 
+ * Just shows a simple example of how the attributes can be extracted and added
+ * to the puts
+ */
+public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
+  @Override
+  protected KeyValue createPuts(byte[] lineBytes, ParsedLine parsed, Put put, int i)
+      throws BadTsvLineException, IOException {
+    KeyValue kv;
+    kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+        parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+        parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+        parsed.getColumnLength(i));
+    if (parsed.getIndividualAttributes() != null) {
+      String[] attributes = parsed.getIndividualAttributes();
+      for (String attr : attributes) {
+        String[] split = attr.split(ImportTsv.DEFAULT_ATTRIBUTES_SEPERATOR);
+        if (split == null || split.length <= 1) {
+          throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+        } else {
+          if (split[0].length() <= 0 || split[1].length() <= 0) {
+            throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+          }
+          put.setAttribute(split[0], Bytes.toBytes(split[1]));
+        }
+      }
+    }
+    put.add(kv);
+    return kv;
+  }
+}