You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2019/12/11 22:16:14 UTC

[GitHub] [accumulo-testing] phrocker commented on a change in pull request #119: Add asynchronous input format and bulk key.

phrocker commented on a change in pull request #119: Add asynchronous input format and bulk key.
URL: https://github.com/apache/accumulo-testing/pull/119#discussion_r356864469
 
 

 ##########
 File path: src/main/java/org/apache/accumulo/testing/continuous/BulkKey.java
 ##########
 @@ -0,0 +1,236 @@
+package org.apache.accumulo.testing.continuous;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Supports immediate sorting via eager deser of the key object. This has the benefit of reducing
+ * the amount of deserialization that may occur when sorting keys in memory
+ */
+public class BulkKey implements WritableComparable<BulkKey> {
+
+  protected Key key = new Key();
+  protected int hashCode = 31;
+
+  static final byte[] EMPTY = {};
+
+  Text row = new Text();
+  Text cf = new Text();
+  Text cq = new Text();
+  Text cv = new Text();
+
+  public BulkKey() {}
+
+  public BulkKey(Key key) {
+    this.key = key;
+    hashCode = key.hashCode();
+  }
+
+  public BulkKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean deleted) {
+    // don't copy the arrays
+    this.key = new Key(row, cf, cq, cv, ts, deleted, false);
+    hashCode = key.hashCode();
+  }
+
+  public Key getKey() {
+    return key;
+  }
+
+  public ByteSequence getRowData() {
+    return key.getRowData();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+
+    final int rowsize = WritableUtils.readVInt(in);
+    final byte[] row = readBytes(in, rowsize);
+
+    final int cfsize = WritableUtils.readVInt(in);
+    final byte[] cf = readBytes(in, cfsize);
+
+    final int cqsize = WritableUtils.readVInt(in);
+    final byte[] cq = readBytes(in, cqsize);
+
+    final int cvsize = WritableUtils.readVInt(in);
+    final byte[] cv = readBytes(in, cvsize);
+
+    final long ts = WritableUtils.readVLong(in);
+    boolean isDeleted = in.readBoolean();
+
+    key = new Key(row, cf, cq, cv, ts, isDeleted, false);
+
+    hashCode = key.hashCode();
+  }
+
+  private static byte[] readBytes(DataInput in, int size) throws IOException {
+    if (size == 0)
+      return EMPTY;
+    final byte[] data = new byte[size];
+    in.readFully(data, 0, data.length);
+    return data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+
+    key.getRow(row);
+    key.getColumnFamily(cf);
+    key.getColumnQualifier(cq);
+    key.getColumnVisibility(cv);
+
+    WritableUtils.writeVInt(out, row.getLength());
 
 Review comment:
   @keith-turner I'm iteratively burning down my PRs. I will post updates to this on Friday with some testing. I have a sizable cluster where I can run this and post timing then. thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services