You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/31 20:05:22 UTC

svn commit: r809680 - in /hadoop/avro/trunk: CHANGES.txt src/java/org/apache/avro/io/BinaryData.java src/test/java/org/apache/avro/TestCompare.java

Author: cutting
Date: Mon Aug 31 18:05:21 2009
New Revision: 809680

URL: http://svn.apache.org/viewvc?rev=809680&view=rev
Log:
AVRO-108.  Add Java implementation of binary comparator.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
Modified:
    hadoop/avro/trunk/CHANGES.txt

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=809680&r1=809679&r2=809680&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon Aug 31 18:05:21 2009
@@ -17,6 +17,9 @@
     AVRO-92. Describe JSON data encoding in specification
     document. (cutting)
 
+    AVRO-108.  Add Java implementation of binary comparator.
+    (cutting)
+
   IMPROVEMENTS
 
     AVRO-71.  C++: make deserializer more generic.  (Scott Banachowski

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java?rev=809680&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java Mon Aug 31 18:05:21 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+
+/** Utilities for binary-encoded data. */
+public class BinaryData {
+
+  private BinaryData() {}                      // no public ctor
+
+  private static final int GT = -1;
+  private static final int LT = -2;
+
+  /** Compare binary encoded data.  If equal, return zero.  If greater-than,
+   * return 1, if less than return -1. */
+  public static int compare(byte[] b1, int s1,
+                            byte[] b2, int s2,
+                            Schema schema) {
+    int comp = comp(b1, s1, b2, s2, schema);          // compare
+    return (comp >= 0) ? 0 : ((comp == GT) ? 1 : -1); // decode comparison
+  }
+   
+  /** If equal, return the number of bytes consumed.  If greater than, return
+   * GT, if less than, return LT. */
+  private static int comp(byte[] b1, int s1, byte[] b2, int s2, Schema schema) {
+    switch (schema.getType()) {
+    case RECORD: {
+      int size = 0;
+      for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
+        int comp = comp(b1, s1+size, b2, s2+size, entry.getValue());
+        if (comp < 0) return comp;
+        size += comp;
+      }
+      return size;
+    }
+    case ENUM: case INT: case LONG: {
+      long l1 = readLong(b1, s1);
+      long l2 = readLong(b2, s2);
+      return (l1 == l2) ? longSize(l1) : ((l1 > l2) ? GT : LT);
+    }
+    case ARRAY: {
+      long i = 0;                                 // position in array
+      long r1 = 0, r2 = 0;                        // remaining in current block
+      long l1 = 0, l2 = 0;                        // total array length
+      int size1 = 0, size2 = 0;                   // total array size
+      while (true) {
+        if (r1 == 0) {                            // refill blocks(s)
+          r1 = readLong(b1, s1+size1);
+          size1 += longSize(r1);
+          l1 += r1;
+        }
+        if (r2 == 0) {
+          r2 = readLong(b2, s2+size2);
+          size2 += longSize(r2);
+          l2 += r2;
+        }
+        if (r1 == 0 || r2 == 0)                   // empty block: done
+          return (l1 == l2) ? size1 : ((l1 > l2) ? GT : LT);
+        long l = Math.min(l1, l2);
+        while (i < l) {                           // compare to end of block
+          int comp = comp(b1, s1+size1, b2, s2+size2, schema.getElementType());
+          if (comp < 0) return comp;
+          size1 += comp;
+          size2 += comp;
+          i++; r1--; r2--;
+        }
+      }
+      }
+    case MAP:
+      throw new AvroRuntimeException("Can't compare maps!");
+    case UNION: {
+      int i1 = readInt(b1, s1);
+      int i2 = readInt(b2, s2);
+      if (i1 == i2) {
+        int size = intSize(i1);
+        return comp(b1, s1+size, b2, s2+size, schema.getTypes().get(i1));
+      } else {
+        return (i1 > i2) ? GT : LT;
+      }
+    }
+    case FIXED: {
+      int size = schema.getFixedSize();
+      int c = compareBytes(b1, s1, size, b2, s2, size);
+      return (c == 0) ? size : ((c > 0) ? GT : LT);
+    }
+    case STRING: case BYTES: {
+      int l1 = readInt(b1, s1);
+      int l2 = readInt(b2, s2);
+      int size1 = intSize(l1);
+      int size2 = intSize(l2);
+      int c = compareBytes(b1, s1+size1, l1, b2, s2+size2, l2);
+      return (c == 0) ? size1+l1 : ((c > 0) ? GT : LT);
+    }
+    case FLOAT: {
+      int n1 = 0, n2 = 0;
+      for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
+        n1 |= (b1[s1+i] & 0xff) << shift;
+        n2 |= (b2[s2+i] & 0xff) << shift;
+      }
+      float f1 = Float.intBitsToFloat(n1);
+      float f2 = Float.intBitsToFloat(n2);
+      return (f1 == f2) ? 4 : ((f1 > f2) ? GT : LT);
+    }
+    case DOUBLE: {
+      long n1 = 0, n2 = 0;
+      for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
+        n1 |= (b1[s1+i] & 0xffL) << shift;
+        n2 |= (b2[s2+i] & 0xffL) << shift;
+      }
+      double d1 = Double.longBitsToDouble(n1);
+      double d2 = Double.longBitsToDouble(n2);
+      return (d1 == d2) ? 8 : ((d1 > d2) ? GT : LT);
+    }
+    case BOOLEAN:
+      return b1[s1] == b2[s2] ? 1 : ((b1[s1] > b2[s2]) ? GT : LT);
+    case NULL:
+      return 0;
+    default:
+      throw new AvroRuntimeException("Unexpected schema to compare!");
+    }
+  }
+
+  /** Lexicographically compare bytes.  If equal, return zero.  If greater-than,
+   * return a positive value, if less than return a negative value. */
+  public static int compareBytes(byte[] b1, int s1, int l1,
+                                 byte[] b2, int s2, int l2) {
+    int end1 = s1 + l1;
+    int end2 = s2 + l2;
+    for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
+      int a = (b1[i] & 0xff);
+      int b = (b2[j] & 0xff);
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return l1 - l2;
+  }
+
+  private static int readInt(byte[] b, int s) {
+    long l = readLong(b, s);
+    if (l < Integer.MIN_VALUE || Integer.MAX_VALUE < l)
+      throw new AvroRuntimeException("Integer overflow.");
+    return (int)l;
+  }
+
+
+  private static long readLong(byte[] buffer, int s) {
+    long n = 0;
+    for (int shift = 0; ; shift += 7) {
+      long b = buffer[s++];
+      n |= (b & 0x7F) << shift;
+      if ((b & 0x80) == 0) {
+        break;
+      }
+    }
+    return (n >>> 1) ^ -(n & 1);                  // back to two's-complement
+  }
+
+  private static int intSize(int i) { return longSize(i); }
+
+  private static int longSize(long n) {
+    int size = 1;
+    n = (n << 1) ^ (n >> 63);                     // move sign to low-order bit
+    while ((n & ~0x7F) != 0) {
+      size++;
+      n >>>= 7;
+    }
+    return size;
+  }
+
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java?rev=809680&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java Mon Aug 31 18:05:21 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.util.Utf8;
+
+public class TestCompare {
+
+  @Test
+  public void testNull() throws Exception {
+    Schema schema = Schema.parse("\"null\"");
+    byte[] b = render(null, schema);
+    assertEquals(0, BinaryData.compare(b, 0, b, 0, schema));
+  }
+
+  @Test
+  public void testBoolean() throws Exception {
+    check("\"boolean\"", Boolean.FALSE, Boolean.TRUE);
+  }
+
+  @Test
+  public void testString() throws Exception {
+    check("\"string\"", new Utf8("a"), new Utf8("b"));
+    check("\"string\"", new Utf8("a"), new Utf8("ab"));
+  }
+
+  @Test
+  public void testBytes() throws Exception {
+    check("\"bytes\"",
+          ByteBuffer.wrap(new byte[]{}),
+          ByteBuffer.wrap(new byte[]{1}));
+    check("\"bytes\"",
+          ByteBuffer.wrap(new byte[]{1}),
+          ByteBuffer.wrap(new byte[]{2}));
+    check("\"bytes\"",
+          ByteBuffer.wrap(new byte[]{1,2}),
+          ByteBuffer.wrap(new byte[]{2}));
+  }
+
+  @Test
+  public void testInt() throws Exception {
+    check("\"int\"", new Integer(-1), new Integer(0));
+    check("\"int\"", new Integer(0), new Integer(1));
+  }
+
+  @Test
+  public void testLong() throws Exception {
+    check("\"long\"", new Long(11), new Long(12));
+    check("\"long\"", new Long(-1), new Long(1));
+  }
+
+  @Test
+  public void testFloat() throws Exception {
+    check("\"float\"", new Float(1.1), new Float(1.2));
+    check("\"float\"", new Float(-1.1), new Float(1.0));
+  }
+
+  @Test
+  public void testDouble() throws Exception {
+    check("\"double\"", new Double(1.2), new Double(1.3));
+    check("\"double\"", new Double(-1.2), new Double(1.3));
+  }
+
+  @Test
+  public void testArray() throws Exception {
+    GenericArray<Long> a1 = new GenericData.Array<Long>(1);
+    a1.add(1L);
+    GenericArray<Long> a2 = new GenericData.Array<Long>(1);
+    a2.add(1L);
+    a2.add(0L);
+    check("{\"type\":\"array\", \"items\": \"long\"}", a1, a2);
+  }
+
+
+  @Test
+  public void testRecord() throws Exception {
+    String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":"
+      +"[{\"name\":\"f\",\"type\":\"int\"},{\"name\":\"g\",\"type\":\"int\"}]}";
+    Schema schema = Schema.parse(recordJson);
+    GenericData.Record r1 = new GenericData.Record(schema);
+    r1.put("f", 11);
+    r1.put("g", 12);
+    GenericData.Record r2 = new GenericData.Record(schema);
+    r2.put("f", 11);
+    r2.put("g", 13);
+    check(recordJson, r1, r2);
+  }
+
+  @Test
+  public void testEnum() throws Exception {
+    check("{\"type\":\"enum\", \"name\":\"Test\",\"symbols\": [\"A\", \"B\"]}",
+          "A", "B");
+  }
+
+  @Test
+  public void testFixed() throws Exception {
+    check("{\"type\": \"fixed\", \"name\":\"Test\", \"size\": 1}",
+          new GenericData.Fixed(new byte[]{(byte)'a'}),
+          new GenericData.Fixed(new byte[]{(byte)'b'}));
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    check("[\"string\", \"long\"]", new Utf8("a"), new Utf8("b"));
+    check("[\"string\", \"long\"]", new Long(1), new Long(2));
+    check("[\"string\", \"long\"]", new Utf8("a"), new Long(1));
+  }
+
+  private static void check(String schemaJson, Object o1, Object o2)
+    throws Exception {
+    Schema schema = Schema.parse(schemaJson);
+    byte[] b1 = render(o1, schema);
+    byte[] b2 = render(o2, schema);
+    assertEquals(-1, BinaryData.compare(b1, 0, b2, 0, schema));
+    assertEquals(1, BinaryData.compare(b2, 0, b1, 0, schema));
+    assertEquals(0, BinaryData.compare(b1, 0, b1, 0, schema));
+    assertEquals(0, BinaryData.compare(b2, 0, b2, 0, schema));
+  }
+
+  private static byte[] render(Object datum, Schema schema)
+    throws IOException {
+    DatumWriter<Object> writer = new GenericDatumWriter<Object>();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    writer.setSchema(schema);
+    writer.write(datum, new BinaryEncoder(out));
+    return out.toByteArray();
+  }
+}