You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/31 20:05:22 UTC
svn commit: r809680 - in /hadoop/avro/trunk: CHANGES.txt
src/java/org/apache/avro/io/BinaryData.java
src/test/java/org/apache/avro/TestCompare.java
Author: cutting
Date: Mon Aug 31 18:05:21 2009
New Revision: 809680
URL: http://svn.apache.org/viewvc?rev=809680&view=rev
Log:
AVRO-108. Add Java implementation of binary comparator.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
Modified:
hadoop/avro/trunk/CHANGES.txt
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=809680&r1=809679&r2=809680&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon Aug 31 18:05:21 2009
@@ -17,6 +17,9 @@
AVRO-92. Describe JSON data encoding in specification
document. (cutting)
+ AVRO-108. Add Java implementation of binary comparator.
+ (cutting)
+
IMPROVEMENTS
AVRO-71. C++: make deserializer more generic. (Scott Banachowski
Added: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java?rev=809680&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java Mon Aug 31 18:05:21 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+
+/** Utilities for binary-encoded data. */
+public class BinaryData {
+
+ private BinaryData() {} // no public ctor
+
+ private static final int GT = -1;
+ private static final int LT = -2;
+
+ /** Compare binary encoded data. If equal, return zero. If greater-than,
+ * return 1, if less than return -1. */
+ public static int compare(byte[] b1, int s1,
+ byte[] b2, int s2,
+ Schema schema) {
+ int comp = comp(b1, s1, b2, s2, schema); // compare
+ return (comp >= 0) ? 0 : ((comp == GT) ? 1 : -1); // decode comparison
+ }
+
+ /** If equal, return the number of bytes consumed. If greater than, return
+ * GT, if less than, return LT. */
+ private static int comp(byte[] b1, int s1, byte[] b2, int s2, Schema schema) {
+ switch (schema.getType()) {
+ case RECORD: {
+ int size = 0;
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
+ int comp = comp(b1, s1+size, b2, s2+size, entry.getValue());
+ if (comp < 0) return comp;
+ size += comp;
+ }
+ return size;
+ }
+ case ENUM: case INT: case LONG: {
+ long l1 = readLong(b1, s1);
+ long l2 = readLong(b2, s2);
+ return (l1 == l2) ? longSize(l1) : ((l1 > l2) ? GT : LT);
+ }
+ case ARRAY: {
+ long i = 0; // position in array
+ long r1 = 0, r2 = 0; // remaining in current block
+ long l1 = 0, l2 = 0; // total array length
+ int size1 = 0, size2 = 0; // total array size
+ while (true) {
+ if (r1 == 0) { // refill blocks(s)
+ r1 = readLong(b1, s1+size1);
+ size1 += longSize(r1);
+ l1 += r1;
+ }
+ if (r2 == 0) {
+ r2 = readLong(b2, s2+size2);
+ size2 += longSize(r2);
+ l2 += r2;
+ }
+ if (r1 == 0 || r2 == 0) // empty block: done
+ return (l1 == l2) ? size1 : ((l1 > l2) ? GT : LT);
+ long l = Math.min(l1, l2);
+ while (i < l) { // compare to end of block
+ int comp = comp(b1, s1+size1, b2, s2+size2, schema.getElementType());
+ if (comp < 0) return comp;
+ size1 += comp;
+ size2 += comp;
+ i++; r1--; r2--;
+ }
+ }
+ }
+ case MAP:
+ throw new AvroRuntimeException("Can't compare maps!");
+ case UNION: {
+ int i1 = readInt(b1, s1);
+ int i2 = readInt(b2, s2);
+ if (i1 == i2) {
+ int size = intSize(i1);
+ return comp(b1, s1+size, b2, s2+size, schema.getTypes().get(i1));
+ } else {
+ return (i1 > i2) ? GT : LT;
+ }
+ }
+ case FIXED: {
+ int size = schema.getFixedSize();
+ int c = compareBytes(b1, s1, size, b2, s2, size);
+ return (c == 0) ? size : ((c > 0) ? GT : LT);
+ }
+ case STRING: case BYTES: {
+ int l1 = readInt(b1, s1);
+ int l2 = readInt(b2, s2);
+ int size1 = intSize(l1);
+ int size2 = intSize(l2);
+ int c = compareBytes(b1, s1+size1, l1, b2, s2+size2, l2);
+ return (c == 0) ? size1+l1 : ((c > 0) ? GT : LT);
+ }
+ case FLOAT: {
+ int n1 = 0, n2 = 0;
+ for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
+ n1 |= (b1[s1+i] & 0xff) << shift;
+ n2 |= (b2[s2+i] & 0xff) << shift;
+ }
+ float f1 = Float.intBitsToFloat(n1);
+ float f2 = Float.intBitsToFloat(n2);
+ return (f1 == f2) ? 4 : ((f1 > f2) ? GT : LT);
+ }
+ case DOUBLE: {
+ long n1 = 0, n2 = 0;
+ for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
+ n1 |= (b1[s1+i] & 0xffL) << shift;
+ n2 |= (b2[s2+i] & 0xffL) << shift;
+ }
+ double d1 = Double.longBitsToDouble(n1);
+ double d2 = Double.longBitsToDouble(n2);
+ return (d1 == d2) ? 8 : ((d1 > d2) ? GT : LT);
+ }
+ case BOOLEAN:
+ return b1[s1] == b2[s2] ? 1 : ((b1[s1] > b2[s2]) ? GT : LT);
+ case NULL:
+ return 0;
+ default:
+ throw new AvroRuntimeException("Unexpected schema to compare!");
+ }
+ }
+
+ /** Lexicographically compare bytes. If equal, return zero. If greater-than,
+ * return a positive value, if less than return a negative value. */
+ public static int compareBytes(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ int end1 = s1 + l1;
+ int end2 = s2 + l2;
+ for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
+ int a = (b1[i] & 0xff);
+ int b = (b2[j] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return l1 - l2;
+ }
+
+ private static int readInt(byte[] b, int s) {
+ long l = readLong(b, s);
+ if (l < Integer.MIN_VALUE || Integer.MAX_VALUE < l)
+ throw new AvroRuntimeException("Integer overflow.");
+ return (int)l;
+ }
+
+
+ private static long readLong(byte[] buffer, int s) {
+ long n = 0;
+ for (int shift = 0; ; shift += 7) {
+ long b = buffer[s++];
+ n |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ }
+ return (n >>> 1) ^ -(n & 1); // back to two's-complement
+ }
+
+ private static int intSize(int i) { return longSize(i); }
+
+ private static int longSize(long n) {
+ int size = 1;
+ n = (n << 1) ^ (n >> 63); // move sign to low-order bit
+ while ((n & ~0x7F) != 0) {
+ size++;
+ n >>>= 7;
+ }
+ return size;
+ }
+
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java?rev=809680&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java Mon Aug 31 18:05:21 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.util.Utf8;
+
+public class TestCompare {
+
+ @Test
+ public void testNull() throws Exception {
+ Schema schema = Schema.parse("\"null\"");
+ byte[] b = render(null, schema);
+ assertEquals(0, BinaryData.compare(b, 0, b, 0, schema));
+ }
+
+ @Test
+ public void testBoolean() throws Exception {
+ check("\"boolean\"", Boolean.FALSE, Boolean.TRUE);
+ }
+
+ @Test
+ public void testString() throws Exception {
+ check("\"string\"", new Utf8("a"), new Utf8("b"));
+ check("\"string\"", new Utf8("a"), new Utf8("ab"));
+ }
+
+ @Test
+ public void testBytes() throws Exception {
+ check("\"bytes\"",
+ ByteBuffer.wrap(new byte[]{}),
+ ByteBuffer.wrap(new byte[]{1}));
+ check("\"bytes\"",
+ ByteBuffer.wrap(new byte[]{1}),
+ ByteBuffer.wrap(new byte[]{2}));
+ check("\"bytes\"",
+ ByteBuffer.wrap(new byte[]{1,2}),
+ ByteBuffer.wrap(new byte[]{2}));
+ }
+
+ @Test
+ public void testInt() throws Exception {
+ check("\"int\"", new Integer(-1), new Integer(0));
+ check("\"int\"", new Integer(0), new Integer(1));
+ }
+
+ @Test
+ public void testLong() throws Exception {
+ check("\"long\"", new Long(11), new Long(12));
+ check("\"long\"", new Long(-1), new Long(1));
+ }
+
+ @Test
+ public void testFloat() throws Exception {
+ check("\"float\"", new Float(1.1), new Float(1.2));
+ check("\"float\"", new Float(-1.1), new Float(1.0));
+ }
+
+ @Test
+ public void testDouble() throws Exception {
+ check("\"double\"", new Double(1.2), new Double(1.3));
+ check("\"double\"", new Double(-1.2), new Double(1.3));
+ }
+
+ @Test
+ public void testArray() throws Exception {
+ GenericArray<Long> a1 = new GenericData.Array<Long>(1);
+ a1.add(1L);
+ GenericArray<Long> a2 = new GenericData.Array<Long>(1);
+ a2.add(1L);
+ a2.add(0L);
+ check("{\"type\":\"array\", \"items\": \"long\"}", a1, a2);
+ }
+
+
+ @Test
+ public void testRecord() throws Exception {
+ String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":"
+ +"[{\"name\":\"f\",\"type\":\"int\"},{\"name\":\"g\",\"type\":\"int\"}]}";
+ Schema schema = Schema.parse(recordJson);
+ GenericData.Record r1 = new GenericData.Record(schema);
+ r1.put("f", 11);
+ r1.put("g", 12);
+ GenericData.Record r2 = new GenericData.Record(schema);
+ r2.put("f", 11);
+ r2.put("g", 13);
+ check(recordJson, r1, r2);
+ }
+
+ @Test
+ public void testEnum() throws Exception {
+ check("{\"type\":\"enum\", \"name\":\"Test\",\"symbols\": [\"A\", \"B\"]}",
+ "A", "B");
+ }
+
+ @Test
+ public void testFixed() throws Exception {
+ check("{\"type\": \"fixed\", \"name\":\"Test\", \"size\": 1}",
+ new GenericData.Fixed(new byte[]{(byte)'a'}),
+ new GenericData.Fixed(new byte[]{(byte)'b'}));
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ check("[\"string\", \"long\"]", new Utf8("a"), new Utf8("b"));
+ check("[\"string\", \"long\"]", new Long(1), new Long(2));
+ check("[\"string\", \"long\"]", new Utf8("a"), new Long(1));
+ }
+
+ private static void check(String schemaJson, Object o1, Object o2)
+ throws Exception {
+ Schema schema = Schema.parse(schemaJson);
+ byte[] b1 = render(o1, schema);
+ byte[] b2 = render(o2, schema);
+ assertEquals(-1, BinaryData.compare(b1, 0, b2, 0, schema));
+ assertEquals(1, BinaryData.compare(b2, 0, b1, 0, schema));
+ assertEquals(0, BinaryData.compare(b1, 0, b1, 0, schema));
+ assertEquals(0, BinaryData.compare(b2, 0, b2, 0, schema));
+ }
+
+ private static byte[] render(Object datum, Schema schema)
+ throws IOException {
+ DatumWriter<Object> writer = new GenericDatumWriter<Object>();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ writer.setSchema(schema);
+ writer.write(datum, new BinaryEncoder(out));
+ return out.toByteArray();
+ }
+}