You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/01 11:51:37 UTC

svn commit: r1573150 - in /hbase/branches/0.98: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/ hbase-common/src/test/java/org/ap...

Author: anoopsamjohn
Date: Sat Mar  1 10:51:36 2014
New Revision: 1573150

URL: http://svn.apache.org/r1573150
Log:
HBASE-10451 Enable back Tag compression on HFiles.(Anoop)

Added:
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
Modified:
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
    hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
    hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Sat Mar  1 10:51:36 2014
@@ -195,9 +195,8 @@ public class HColumnDescriptor implement
 
   /**
    * Default compress tags along with any type of DataBlockEncoding.
-   * Disabled to false by default in 0.98.0
    */
-  public static final boolean DEFAULT_COMPRESS_TAGS = false;
+  public static final boolean DEFAULT_COMPRESS_TAGS = true;
 
   private final static Map<String, String> DEFAULT_VALUES
     = new HashMap<String, String>();

Modified: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java (original)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java Sat Mar  1 10:51:36 2014
@@ -41,12 +41,12 @@ import org.apache.hadoop.io.IOUtils;
 public class TagCompressionContext {
   private final Dictionary tagDict;
 
-  public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
-      NoSuchMethodException, InstantiationException, IllegalAccessException,
-      InvocationTargetException {
+  public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
+      throws SecurityException, NoSuchMethodException, InstantiationException,
+      IllegalAccessException, InvocationTargetException {
     Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
     tagDict = dictConstructor.newInstance();
-    tagDict.init(Short.MAX_VALUE);
+    tagDict.init(dictCapacity);
   }
 
   public void clear() {
@@ -131,10 +131,12 @@ public class TagCompressionContext {
    * @param dest Destination array where to write the uncompressed tags
    * @param offset Offset in destination where tags to be written
    * @param length Length of all tag bytes
+   * @return bytes count read from source to uncompress all tags.
    * @throws IOException
    */
-  public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
+  public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
       throws IOException {
+    int srcBeginPos = src.position();
     int endOffset = offset + length;
     while (offset < endOffset) {
       byte status = src.get();
@@ -158,6 +160,7 @@ public class TagCompressionContext {
         offset += tagLen;
       }
     }
+    return src.position() - srcBeginPos;
   }
 
   /**

Modified: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Sat Mar  1 10:51:36 2014
@@ -52,12 +52,20 @@ abstract class BufferedDataBlockEncoder 
 
     HFileBlockDefaultDecodingContext decodingCtx =
         (HFileBlockDefaultDecodingContext) blkDecodingCtx;
-    if (decodingCtx.getHFileContext().isCompressTags()) {
-      try {
-        TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
-        decodingCtx.setTagCompressionContext(tagCompressionContext);
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize TagCompressionContext", e);
+    if (decodingCtx.getHFileContext().isIncludesTags()
+        && decodingCtx.getHFileContext().isCompressTags()) {
+      if (decodingCtx.getTagCompressionContext() != null) {
+        // It will be overhead to create the TagCompressionContext again and again for every block
+        // decoding.
+        decodingCtx.getTagCompressionContext().clear();
+      } else {
+        try {
+          TagCompressionContext tagCompressionContext = new TagCompressionContext(
+              LRUDictionary.class, Byte.MAX_VALUE);
+          decodingCtx.setTagCompressionContext(tagCompressionContext);
+        } catch (Exception e) {
+          throw new IOException("Failed to initialize TagCompressionContext", e);
+        }
       }
     }
     return internalDecodeKeyValues(source, 0, 0, decodingCtx);
@@ -70,6 +78,8 @@ abstract class BufferedDataBlockEncoder 
     protected int lastCommonPrefix;
     protected int tagsLength = 0;
     protected int tagsOffset = -1;
+    protected int tagsCompressedLength = 0;
+    protected boolean uncompressTags = true;
 
     /** We need to store a copy of the key. */
     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
@@ -84,6 +94,8 @@ abstract class BufferedDataBlockEncoder 
 
     protected void invalidate() {
       valueOffset = -1;
+      tagsCompressedLength = 0;
+      uncompressTags = true;
     }
 
     protected void ensureSpaceForKey() {
@@ -160,7 +172,7 @@ abstract class BufferedDataBlockEncoder 
       this.decodingCtx = decodingCtx;
       if (decodingCtx.getHFileContext().isCompressTags()) {
         try {
-          tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
         } catch (Exception e) {
           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
         }
@@ -249,6 +261,9 @@ abstract class BufferedDataBlockEncoder 
     @Override
     public void rewind() {
       currentBuffer.rewind();
+      if (tagCompressionContext != null) {
+        tagCompressionContext.clear();
+      }
       decodeFirst();
       previous.invalidate();
     }
@@ -266,13 +281,18 @@ abstract class BufferedDataBlockEncoder 
     protected void decodeTags() {
       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
       if (tagCompressionContext != null) {
-        // Tag compression is been used. uncompress it into tagsBuffer
-        current.ensureSpaceForTags();
-        try {
-          tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
-              current.tagsLength);
-        } catch (IOException e) {
-          throw new RuntimeException("Exception while uncompressing tags", e);
+        if (current.uncompressTags) {
+          // Tag compression is been used. uncompress it into tagsBuffer
+          current.ensureSpaceForTags();
+          try {
+            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
+                current.tagsBuffer, 0, current.tagsLength);
+          } catch (IOException e) {
+            throw new RuntimeException("Exception while uncompressing tags", e);
+          }
+        } else {
+          ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
+          current.uncompressTags = true;// Reset this.
         }
         current.tagsOffset = -1;
       } else {
@@ -355,7 +375,15 @@ abstract class BufferedDataBlockEncoder 
 
       // move after last key value
       currentBuffer.position(current.nextKvOffset);
-
+      // Already decoded the tag bytes. We cache this tags into current state and also the total
+      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
+      // the tags again. This might pollute the Data Dictionary what we use for the compression.
+      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
+      // 'tagsCompressedLength' bytes of source stream.
+      // See in decodeTags()
+      current.tagsBuffer = previous.tagsBuffer;
+      current.tagsCompressedLength = previous.tagsCompressedLength;
+      current.uncompressTags = false;
       previous.invalidate();
     }
 
@@ -468,12 +496,20 @@ abstract class BufferedDataBlockEncoder 
         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
     encodingCtx.prepareEncoding();
     DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
-    if (encodingCtx.getHFileContext().isCompressTags()) {
-      try {
-        TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
-        encodingCtx.setTagCompressionContext(tagCompressionContext);
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize TagCompressionContext", e);
+    if (encodingCtx.getHFileContext().isIncludesTags()
+        && encodingCtx.getHFileContext().isCompressTags()) {
+      if (encodingCtx.getTagCompressionContext() != null) {
+        // It will be overhead to create the TagCompressionContext again and again for every block
+        // encoding.
+        encodingCtx.getTagCompressionContext().clear();
+      } else {
+        try {
+          TagCompressionContext tagCompressionContext = new TagCompressionContext(
+              LRUDictionary.class, Byte.MAX_VALUE);
+          encodingCtx.setTagCompressionContext(tagCompressionContext);
+        } catch (Exception e) {
+          throw new IOException("Failed to initialize TagCompressionContext", e);
+        }
       }
     }
     internalEncodeKeyValues(dataOut, in, encodingCtx);

Modified: hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java (original)
+++ hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java Sat Mar  1 10:51:36 2014
@@ -45,7 +45,7 @@ public class TestTagCompressionContext {
   @Test
   public void testCompressUncompressTags1() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
     KeyValue kv1 = createKVWithTags(2);
     short tagsLength1 = kv1.getTagsLength();
     ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
@@ -71,7 +71,7 @@ public class TestTagCompressionContext {
   @Test
   public void testCompressUncompressTags2() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
     KeyValue kv1 = createKVWithTags(1);
     short tagsLength1 = kv1.getTagsLength();
     context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Mar  1 10:51:36 2014
@@ -891,15 +891,11 @@ public class HStore implements Store {
     if (compression == null) {
       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
     }
-    if(family.shouldCompressTags()) {
-      LOG.warn("HFile tag compression attribute ignored for '" + family.getNameAsString()
-          + "', see HBASE-10443.");
-    }
     HFileContext hFileContext = new HFileContextBuilder()
                                 .withIncludesMvcc(includeMVCCReadpoint)
                                 .withIncludesTags(includesTag)
                                 .withCompression(compression)
-                                .withCompressTags(false)
+                                .withCompressTags(family.shouldCompressTags())
                                 .withChecksumType(checksumType)
                                 .withBytesPerCheckSum(bytesPerChecksum)
                                 .withBlockSize(blocksize)

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java Sat Mar  1 10:51:36 2014
@@ -64,7 +64,7 @@ class CompressionContext {
     familyDict.init(Byte.MAX_VALUE);
     qualifierDict.init(Byte.MAX_VALUE);
     if (hasTagCompression) {
-      tagCompressionContext = new TagCompressionContext(dictType);
+      tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
     }
   }
 

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Sat Mar  1 10:51:36 2014
@@ -70,7 +70,6 @@ public class TestEncodedSeekers {
 
   private final HBaseTestingUtility testUtil = HBaseTestingUtility.createLocalHTU();
   private final DataBlockEncoding encoding;
-  private final boolean encodeOnDisk;
   private final boolean includeTags;
   private final boolean compressTags;
 
@@ -82,28 +81,24 @@ public class TestEncodedSeekers {
     List<Object[]> paramList = new ArrayList<Object[]>();
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
       for (boolean includeTags : new boolean[] { false, true }) {
-        for (boolean encodeOnDisk : new boolean[] { false, true }) {
-          for (boolean compressTags : new boolean[] { false, true }) {
-            paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags });
-          }
+        for (boolean compressTags : new boolean[] { false, true }) {
+          paramList.add(new Object[] { encoding, includeTags, compressTags });
         }
       }
     }
     return paramList;
   }
 
-  public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
-      boolean compressTags) {
+  public TestEncodedSeekers(DataBlockEncoding encoding, boolean includeTags, boolean compressTags) {
     this.encoding = encoding;
-    this.encodeOnDisk = encodeOnDisk;
     this.includeTags = includeTags;
     this.compressTags = compressTags;
   }
 
   @Test
   public void testEncodedSeeker() throws IOException {
-    System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : "
-        + encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
+    System.err.println("Testing encoded seekers for encoding : " + encoding + ", includeTags : "
+        + includeTags + ", compressTags : " + compressTags);
     if(includeTags) {
       testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
     }

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java?rev=1573150&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java Sat Mar  1 10:51:36 2014
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestStoreFileScannerWithTagCompression {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+  private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
+  private static String ROOT_DIR = TEST_UTIL.getDataTestDir(
+      "TestStoreFileScannerWithTagCompression").toString();
+  private static FileSystem fs = null;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    conf.setInt("hfile.format.version", 3);
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testReseek() throws Exception {
+    // write the file
+    Path f = new Path(ROOT_DIR, "testReseek");
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true)
+        .withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build();
+    // Make a store file and write data to it.
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs).withFilePath(f)
+        .withFileContext(meta).build();
+
+    writeStoreFile(writer);
+    writer.close();
+
+    StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
+    StoreFileScanner s = reader.getStoreFileScanner(false, false);
+    try {
+      // Now do reseek with empty KV to position to the beginning of the file
+      KeyValue k = KeyValue.createFirstOnRow(Bytes.toBytes("k2"));
+      s.reseek(k);
+      KeyValue kv = s.next();
+      kv = s.next();
+      kv = s.next();
+      byte[] key5 = Bytes.toBytes("k5");
+      assertTrue(Bytes.equals(key5, 0, key5.length, kv.getRowArray(), kv.getRowOffset(),
+          kv.getRowLength()));
+      List<Tag> tags = kv.getTags();
+      assertEquals(1, tags.size());
+      assertEquals("tag3", Bytes.toString(tags.get(0).getValue()));
+    } finally {
+      s.close();
+    }
+  }
+
+  private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
+    byte[] fam = Bytes.toBytes("f");
+    byte[] qualifier = Bytes.toBytes("q");
+    long now = System.currentTimeMillis();
+    byte[] b = Bytes.toBytes("k1");
+    Tag t1 = new Tag((byte) 1, "tag1");
+    Tag t2 = new Tag((byte) 2, "tag2");
+    Tag t3 = new Tag((byte) 3, "tag3");
+    try {
+      writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t1 }));
+      b = Bytes.toBytes("k3");
+      writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t2, t1 }));
+      b = Bytes.toBytes("k4");
+      writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
+      b = Bytes.toBytes("k5");
+      writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
+    } finally {
+      writer.close();
+    }
+  }
+}