You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC
svn commit: r833166 [2/5] - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Base class for comparator expressions
+ */
+public abstract class ComparatorExpr {
+ protected abstract void appendLeafGenerator(List<LeafGenerator> list, int el,
+ int cel, boolean c, boolean explicitBound);
+
+ protected abstract void toString(PrintStream out);
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public class DoubleExpr extends FixedLengthPrimitive {
+ DoubleExpr(int index) {
+ super(index, Double.SIZE / Byte.SIZE);
+ }
+
+ @Override
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Double.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ Double f = (Double) o;
+ long l = Double.doubleToLongBits(f);
+ if (l < 0) {
+ l = ~l;
+ } else {
+ l ^= 1L << (Long.SIZE - 1);
+ }
+ b[7] = (byte) l;
+ b[6] = (byte) (l >> 8);
+ b[5] = (byte) (l >> 16);
+ b[4] = (byte) (l >> 24);
+ b[3] = (byte) (l >> 32);
+ b[2] = (byte) (l >> 40);
+ b[1] = (byte) (l >> 48);
+ b[0] = (byte) (l >> 56);
+
+ }
+
+ @Override
+ protected String getType() {
+ return "Double";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+
+public class ExprUtils {
+
+ /**
+ * Make a bag comparator expression.
+ *
+ * @param index
+ * the index of datum in the source tuple that is a {@link DataBag}
+ * object.
+ * @param expr
+ * a comparator expression that corresponds to the member tuples in
+ * the bag.
+ * @return A comparator expression.
+ *
+ * <p>
+ * Example: suppose we have tuple schema as follows: <code>
+ * Tuple {
+ * int a;
+ * String b;
+ * Bag {
+ * Tuple {
+ * Bytes c;
+ * int d;
+ * String e;
+ * }
+ * } f
+ * }
+ * </code>
+ *
+ * We would like to sort by
+ * <code>Tuple(b, Bag(Negate(Tuple(e, c))))</code>, we can construct
+ * the ComparatorExpr as follows; <code>
+ * ComparatorExpr expr = tupleComparator(
+ * leafComparator(1, DataType.CHARARRAY),
+ * bagComparator(2,
+ * negateComparator(
+ * tupleComparator(
+ * leafCmparator(2, DataType.CHARARRAY),
+ * leafComparator(0, DataType.BYTEARRAY)))))
+ * </code>
+ */
+ public static ComparatorExpr bagComparator(int index, ComparatorExpr expr) {
+ return new BagExpr(index, expr);
+ }
+
+ /**
+ * Converting an expression to a string.
+ *
+ * @param expr
+ * @return A string representation of the comparator expression.
+ */
+ public static String exprToString(ComparatorExpr expr) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bout);
+ expr.toString(out);
+ out.close();
+ return new String(bout.toByteArray());
+ }
+
+ /**
+ * Comparator for primitive types.
+ *
+ * @param index
+ * Index in the source tuple.
+ * @param type
+ * One of the constants defined in {@link DataType} for primitive
+ * types.
+ * @return Comparator expression.
+ */
+ public static ComparatorExpr primitiveComparator(int index, int type) {
+ switch (type) {
+ case DataType.BOOLEAN:
+ return new BooleanExpr(index);
+ case DataType.BYTE:
+ return new ByteExpr(index);
+ case DataType.BYTEARRAY:
+ return new BytesExpr(index);
+ case DataType.CHARARRAY:
+ return new StringExpr(index);
+ case DataType.DOUBLE:
+ return new DoubleExpr(index);
+ case DataType.FLOAT:
+ return new FloatExpr(index);
+ case DataType.INTEGER:
+ return new IntExpr(index);
+ case DataType.LONG:
+ return new LongExpr(index);
+ default:
+ throw new RuntimeException("Not a prmitive PIG type");
+ }
+ }
+
+ /**
+ * Negate comparator
+ *
+ * @param expr
+ * expression to perform negation on.
+ * @return A comparator expression.
+ */
+ public static ComparatorExpr negationComparator(ComparatorExpr expr) {
+ return NegateExpr.makeNegateExpr(expr);
+ }
+
+ /**
+ * Make a Tuple comparator expression.
+ *
+ * @param exprs
+ * member comparator expressions.
+ * @return A comparator expression.
+ */
+ public static ComparatorExpr tupleComparator(
+ Collection<? extends ComparatorExpr> exprs) {
+ return TupleExpr.makeTupleComparator(exprs);
+ }
+
+ /**
+ * Make a Tuple comparator expression.
+ *
+ * @param exprs
+ * member comparator expressions.
+ * @return A comparator expression.
+ */
+ public static ComparatorExpr tupleComparator(ComparatorExpr... exprs) {
+ return TupleExpr.makeTupleExpr(exprs);
+ }
+
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+/**
+ * Helper class for fixed length primitives
+ */
+public abstract class FixedLengthPrimitive extends LeafExpr {
+ protected byte[] bytes;
+
+ /**
+ * Constructor
+ *
+ * @param index
+ * @param length
+ */
+ protected FixedLengthPrimitive(int index, int length) {
+ super(index);
+ bytes = new byte[length];
+ }
+
+ protected abstract void convertValue(byte[] b, Object o);
+
+ protected void appendObject(EncodingOutputStream out, Object o) {
+ convertValue(bytes, o);
+ out.write(bytes, 0, bytes.length);
+ }
+
+ protected boolean implicitBound() {
+ return false;
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class FloatExpr extends FixedLengthPrimitive {
+ public FloatExpr(int index) {
+ super(index, Float.SIZE / Byte.SIZE);
+ }
+
+ @Override
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Float.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ Float f = (Float) o;
+ int l = Float.floatToIntBits(f);
+ if (l < 0) {
+ l = ~l;
+ } else {
+ l ^= 1 << (Integer.SIZE - 1);
+ }
+ b[3] = (byte) l;
+ b[2] = (byte) (l >> 8);
+ b[1] = (byte) (l >> 16);
+ b[0] = (byte) (l >> 24);
+ }
+
+ @Override
+ protected String getType() {
+ return "Float";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class IntExpr extends FixedLengthPrimitive {
+ public IntExpr(int index) {
+ super(index, Integer.SIZE / Byte.SIZE);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Integer.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ int l = (Integer) o;
+ l ^= 1 << (Integer.SIZE - 1);
+ b[3] = (byte) l;
+ b[2] = (byte) (l >> 8);
+ b[1] = (byte) (l >> 16);
+ b[0] = (byte) (l >> 24);
+ }
+
+ @Override
+ protected String getType() {
+ return "Integer";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Extended from ByteArrayOutputStream with direct access to the underlying byte
+ * array and explicit capacity expansion.
+ *
+ * Also adding the capability of escaping:
+ *
+ * <code>
+ * el - escape level for 0x00, valid value 0-252
+ * cel - escape level for 0xff, valid value 0-252
+ * escaped
+ * el 0x00 0x01 others
+ * 0 AS-IS AS-IS AS-IS
+ * >0 0x01 0x01+el 0x01FD AS-IS
+ * escaped
+ * cel 0xFF 0xFE others
+ * 0 AS-IS AS-IS AS-IS
+ * >0 0xFE 0xFE-cel 0xFE02 AS-IS
+ * </code>
+ */
+class EncodingOutputStream extends ByteArrayOutputStream {
+ int escapeLevel = 0;
+ int comescLevel = 0;
+ boolean complement = false;
+
+ public EncodingOutputStream() {
+ super();
+ }
+
+ public EncodingOutputStream(int size) {
+ super(size);
+ }
+
+ public byte[] get() {
+ return buf;
+ }
+
+ public void ensureAvailable(int len) {
+ int newcount = count + len;
+ if (newcount > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+ }
+ }
+
+ public void setEscapeParams(int el, int cel, boolean c) {
+ escapeLevel = el;
+ comescLevel = cel;
+ complement = c;
+ }
+
+ public int getEscapeLevel() {
+ return escapeLevel;
+ }
+
+ public int getComescLevel() {
+ return comescLevel;
+ }
+
+ public boolean getComplement() {
+ return complement;
+ }
+
+ void writeEscaped(int v, boolean c) {
+ ensureAvailable(2);
+ buf[count] = 0x01;
+ buf[count + 1] = (byte) v;
+ if (c) {
+ buf[count] = (byte) (~buf[count]);
+ buf[count + 1] = (byte) (~buf[count + 1]);
+ }
+ count += 2;
+ }
+
+ /**
+ * Write an escaped 0x00.
+ */
+ void escape00() {
+ writeEscaped(escapeLevel + 1, complement);
+ }
+
+ /**
+ * Write an escaped 0x01.
+ */
+ void escape01() {
+ writeEscaped(0xFD, complement);
+ }
+
+ /**
+ * Write an escaped 0xFE
+ */
+ void escapeFE() {
+ writeEscaped(0xFD, !complement);
+ }
+
+ /**
+ * write an escaped 0xFF
+ */
+ void escapeFF() {
+ writeEscaped(comescLevel + 1, !complement);
+ }
+
+ void complement(byte b[], int begin, int end) {
+ if (begin >= end) return;
+ ensureAvailable(end - begin);
+ if (!complement) {
+ System.arraycopy(b, begin, buf, count, end - begin);
+ count += (end - begin);
+ } else {
+ for (int i = begin; i < end; ++i) {
+ buf[count++] = (byte) ~b[i];
+ }
+ }
+ }
+
+ void escape(int b) {
+ switch (b) {
+ case 0:
+ escape00();
+ break;
+ case 1:
+ escape01();
+ break;
+ case 0xfe:
+ escapeFE();
+ break;
+ case 0xff:
+ escapeFF();
+ break;
+ }
+ }
+
+ public void write(int b) {
+ if (!shouldEscape(b, escapeLevel > 0, comescLevel > 0)) {
+ ensureAvailable(1);
+ if (complement) {
+ buf[count++] = (byte) ~b;
+ } else {
+ buf[count++] = (byte) b;
+ }
+ } else {
+ escape(b);
+ }
+ }
+
+ public void write(byte b[]) {
+ write(b, 0, b.length);
+ }
+
+ static boolean shouldEscape(int b, boolean checkLow, boolean checkHigh) {
+ if (checkLow && b < 0x2) return true;
+ if (checkHigh && b > 0xfd) return true;
+ return false;
+ }
+
+ public void write(byte b[], int off, int len) {
+ if ((escapeLevel > 0) || (comescLevel > 0)) {
+ ensureAvailable(len);
+ int begin = off;
+ int next = begin;
+ int end = off + len;
+ for (; begin < end; begin = next) {
+ while ((next < end)
+ && (!shouldEscape(b[next] & 0xff, escapeLevel > 0, comescLevel > 0))) {
+ ++next;
+ }
+ complement(b, begin, next);
+ if (next < end) {
+ escape(b[next] & 0xff);
+ ++next;
+ }
+ }
+ } else {
+ complement(b, off, off + len);
+ }
+ }
+}
+
+/**
+ * Generating binary keys for algorithmic comparators. A user may construct an
+ * algorithmic comparator by creating a ComparatorExpr object (through various
+ * static methods in this class). She could then create a KeyGenerator object
+ * and use it to create binary keys for tuple. The KeyGenerator object can be
+ * reused for different tuples that conform to the same schema. Sorting the
+ * tuples by the binary key yields the same ordering as sorting by the
+ * algorithmic comparator.
+ *
+ * Basic idea (without optimization):
+ * <ul>
+ * <li>define two operations: escape and complement, that takes in a byte array,
+ * and outputs a byte array:
+ *
+ * <pre>
+ * escape(byte[] bytes) {
+ * for (byte b : bytes) {
+ * if (b == 0)
+ * emit(0x1, 0x0);
+ * else if (b == 1)
+ * emit(0x1, 0x2);
+ * else emit(b);
+ * }
+ * }
+ *
+ * complement(byte[] bytes) {
+ * for (byte b : bytes) {
+ * emit(˜b);
+ * }
+ * }
+ * </pre>
+ *
+ * <li>find ways to convert primitive types to bytes that compares in the same
+ * order as those objects.
+ * <li>operations:
+ * <ul>
+ * <li>negate(byte[] bytes) == complement(escape(bytes) + 0x0);
+ * <li>tuple(byte[] bytes1, byte[] bytes2) == escape(bytes1) + 0x0 +
+ * escape(bytes2)
+ * <li>bag(byte[] bytes1, byte[] bytes2, ... ) = escape(bytes1) + 0x0 +
+ * escape(bytes2) + ...
+ * </ul>
+ * <li>optimizations:
+ * <ul>
+ * <li>negate(negate(bytes)) == bytes;
+ * <li>tuple(a) == a;
+ * <li>tuple(a, tuple(b, c)) == tuple(a, b, c)
+ * <li>the actual output would be a concatenation of f1(o1), f2(o2), ..., where
+ * o1, o2, are leaf datums in the tuple or 0x0, and fi(oi) is a nested function
+ * of escape() and complement() calls.
+ * <li>The invariance we want to preserve is that escape(0x1) >
+ * escape(escape(0x0)) > escape(0x0) > 0x0. In the basic algorithm, these are
+ * escaped as 0x0102, 0x010100, 0x0100, 0x00, and are thus variable length. We
+ * can actually collapse nested consecutive calls of
+ * escape(escape(...escape(0))...) to escape(i, 0), where i is the level of
+ * nesting, and fi may be represented as nested inter-leaved calling of
+ * complement(bytes) and escape(i, bytes), where escape (i, 0x0) == 0x01 +
+ * (0x01+i) and escape(i, 1) == 0x010xFD. We do limit the total nesting depth by
+ * 252, which should be plenty.
+ * <li>we can further optimize fi as either escape(i, j, bytes) or
+ * complement(escape(i, j, bytes), where i is the level of nesting for escaping
+ * 0x0 and 0x1, and j the level of nesting for escaping 0xff and 0xfe.
+ * <li>If the binary keys being generated from a certain comparator either
+ * compare equal or differ at some byte position, but never the case where one
+ * is a prefix of another, then we do not need to add padding 0x0 for negate()
+ * or tuple(). This is captured by the method implicitBound() in ComparatorExpr.
+ * <li>We figure out how datums should be extracted from a tuple and being
+ * escaped only once for any expression, and write to a modified
+ * ByteArrayOutputStream in one pass.
+ * </ul>
+ * </ul>
+ *
+ * TODO Remove the strong dependency with Pig by adding a DatumExtractor
+ * interface that allow applications to extract leaf datum from user objects,
+ * something like the following:
+ *
+ * <pre>
+ * interface DatumExtractor {
+ * Object extract(Object o);
+ * }
+ * </pre>
+ *
+ * And user may do something like this:
+ *
+ * <pre>
+ * class MyObject {
+ * int a;
+ * String b;
+ * }
+ *
+ * ComparatorExpr expr = KeyBuilder.createLeafExpr(new DatumExtractor {
+ * Object extract(Object o) {
+ * MyObject obj = (MyObject)o;
+ * return obj.b;
+ * } }, DataType.CHARARRAY);
+ * </pre>
+ *
+ * TODO Change BagExpr to IteratorExpr, so that it may be used in more general
+ * context (any Java collection).
+ *
+ * TODO Add an ArrayExpr (for Java []).
+ */
+public class KeyGenerator {
+ private EncodingOutputStream out;
+ private List<LeafGenerator> list;
+
+ /**
+ * Create a key builder that can generate binary keys for the input key
+ * expression.
+ *
+ * @param expr
+ * comparator expression
+ */
+ public KeyGenerator(ComparatorExpr expr) {
+ out = new EncodingOutputStream();
+ list = new ArrayList<LeafGenerator>();
+ expr.appendLeafGenerator(list, 0, 0, false, false);
+ // illustrate(System.out);
+ System.out.println(ExprUtils.exprToString(expr));
+ }
+
+ /**
+ * Reset the key builder for a new expression.
+ *
+ * @param expr
+ * comparator expression
+ */
+ public void reset(ComparatorExpr expr) {
+ list.clear();
+ expr.appendLeafGenerator(list, 0, 0, false, false);
+ }
+
+ /**
+ * Generate the binary key for the input tuple
+ *
+ * @param t
+ * input tuple
+ * @return A {@link BytesWritable} containing the binary sorting key for the
+ * input tuple.
+ * @throws ExecException
+ */
+ public BytesWritable generateKey(Tuple t) throws ExecException {
+ out.reset();
+ for (Iterator<LeafGenerator> it = list.iterator(); it.hasNext();) {
+ LeafGenerator e = it.next();
+ e.append(out, t);
+ }
+ BytesWritable ret = new BytesWritable();
+ ret.set(out.get(), 0, out.size());
+ return ret;
+ }
+
+ /**
+ * Illustrate how the key would be generated from source.
+ *
+ * @param ps
+ * The output print stream.
+ */
+ public void illustrate(PrintStream ps) {
+ for (Iterator<LeafGenerator> it = list.iterator(); it.hasNext();) {
+ LeafGenerator e = it.next();
+ e.illustrate(ps);
+ }
+ ps.print("\n");
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Base class of comparator expressions that are the leaves of the expression
+ * tree.
+ */
+public abstract class LeafExpr extends ComparatorExpr {
+ int index;
+
+ /**
+ * Constructor
+ *
+ * @param index
+ * tuple position index
+ */
+ protected LeafExpr(int index) {
+ this.index = index;
+ }
+
+ /**
+ * Default illustrator for leaf expressions.
+ *
+ * @param out
+ * @param escapeLevel
+ * @param comescLevel
+ * @param complement
+ */
+ protected void illustrate(PrintStream out, int escapeLevel, int comescLevel,
+ boolean complement) {
+ out.printf("(%s, %d, %d, %d, %b)", getType(), index, escapeLevel,
+ comescLevel, complement);
+ }
+
+ protected void append(EncodingOutputStream out, Tuple tuple)
+ throws ExecException {
+
+ if(index >= tuple.size())
+ return;
+ Object o = tuple.get(index);
+ if (o == null) {
+ out.write(0x0);
+ return;
+ }
+ appendObject(out, o);
+ }
+
+ protected abstract void appendObject(EncodingOutputStream out, Object object)
+ throws ExecException;
+
+ protected abstract String getType();
+
+ protected abstract boolean implicitBound();
+
+ protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+ boolean c, boolean explicitBound) {
+ if (explicitBound && implicitBound()) {
+ // requiring explicit bound, while the leaf does not already have it, so
+ // we do escaping and add a bound at end.
+ list.add(new LeafGenerator(this, el + 1, cel, c));
+ list.add(new LeafGenerator(null, el, cel, c));
+ } else {
+ list.add(new LeafGenerator(this, el, cel, c));
+ }
+ }
+
+ @Override
+ protected
+ void toString(PrintStream out) {
+ out.printf("%s(%d)", getType(), index);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Generate partial binary key for a leaf expression.
+ */
+public final class LeafGenerator {
+ final LeafExpr leaf; // may be null to represent adding '0'
+ final int escapeLevel;
+ final int comescLevel;
+ final boolean complement;
+
+ /**
+ * @param leaf
+ * The leaf expression
+ * @param el
+ * escape level
+ * @param cel
+ * complement escape level
+ * @param complement
+ * complement the value
+ */
+ public LeafGenerator(LeafExpr leaf, int el, int cel, boolean complement) {
+ this.leaf = leaf;
+ this.escapeLevel = el;
+ if(escapeLevel > 1) {
+ System.out.println("gaurav");
+ }
+ this.comescLevel = cel;
+ this.complement = complement;
+ }
+
+ void append(EncodingOutputStream out, Tuple tuple) throws ExecException {
+ out.setEscapeParams(escapeLevel, comescLevel, complement);
+ if (leaf == null) { // add a '\0'
+ out.write(0x0);
+ } else {
+ leaf.append(out, tuple);
+ }
+ }
+
+ void illustrate(PrintStream out) {
+ if (leaf == null) {
+ out.printf("(%s, NA, %d, %d, %b)", "NULL", escapeLevel, comescLevel,
+ complement);
+ } else {
+ leaf.illustrate(out, escapeLevel, comescLevel, complement);
+ }
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class LongExpr extends FixedLengthPrimitive {
+ public LongExpr(int index) {
+ super(index, Long.SIZE / Byte.SIZE);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Long.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ long l = (Long) o;
+ l ^= 1L << (Long.SIZE - 1);
+ b[7] = (byte) l;
+ b[6] = (byte) (l >> 8);
+ b[5] = (byte) (l >> 16);
+ b[4] = (byte) (l >> 24);
+ b[3] = (byte) (l >> 32);
+ b[2] = (byte) (l >> 40);
+ b[1] = (byte) (l >> 48);
+ b[0] = (byte) (l >> 56);
+ }
+
+ @Override
+ protected String getType() {
+ return "Long";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Negate expression
+ */
+public class NegateExpr extends ComparatorExpr {
+ protected final ComparatorExpr expr;
+
+ /**
+ * constructor
+ *
+ * @param expr
+ */
+ protected NegateExpr(ComparatorExpr expr) {
+ this.expr = expr;
+ }
+
+ @Override
+ protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+ boolean c, boolean explicitBound) {
+ expr.appendLeafGenerator(list, cel, el, !c, true);
+ }
+
+ /**
+ * @return the child expression
+ */
+ public ComparatorExpr childExpr() {
+ return expr;
+ }
+
+ @Override
+ protected void toString(PrintStream out) {
+ out.print("Negate(");
+ expr.toString(out);
+ out.print(")");
+ }
+
+ /**
+ * Make a negate expression
+ */
+ public static ComparatorExpr makeNegateExpr(ComparatorExpr expr) {
+ if (expr instanceof NegateExpr) {
+ NegateExpr negateExpr = (NegateExpr) expr;
+ return negateExpr.childExpr();
+ }
+
+ return new NegateExpr(expr);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class ShortExpr extends FixedLengthPrimitive {
+ public ShortExpr(int index) {
+ super(index, Short.SIZE / Byte.SIZE);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Short.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ short l = (Short) o;
+ l ^= 1 << (Short.SIZE - 1);
+ b[1] = (byte) l;
+ b[0] = (byte) (l >> 8);
+ }
+
+ @Override
+ protected String getType() {
+ return "Short";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+public final class StringExpr extends LeafExpr {
+ public StringExpr(int index) {
+ super(index);
+ }
+
+ @Override
+ protected void appendObject(EncodingOutputStream out, Object o) {
+ String s = (String) o;
+ try {
+ ByteBuffer bytes = Text.encode(s);
+ out.write(bytes.array(), 0, bytes.limit());
+ } catch (IOException e) {
+ throw new RuntimeException("Conversion error", e);
+ }
+ }
+
+ @Override
+ protected boolean implicitBound() {
+ return true;
+ }
+
+ @Override
+ protected String getType() {
+ return "String";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class TupleExpr extends ComparatorExpr {
+ protected final List<ComparatorExpr> exprs;
+
+ protected TupleExpr(List<ComparatorExpr> exprs) {
+ this.exprs = exprs;
+ }
+
+ @Override
+ protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+ boolean c, boolean explicitBound) {
+ int length = exprs.size();
+ for (int i = 0; i < length - 1; ++i) {
+ ComparatorExpr e = exprs.get(i);
+ e.appendLeafGenerator(list, el, cel, c, true);
+ }
+ exprs.get(length - 1).appendLeafGenerator(list, el, cel, c, explicitBound);
+ }
+
+ /**
+ * Get the children expressions.
+ *
+ * @return The children expressions.
+ */
+ public List<ComparatorExpr> childrenExpr() {
+ return exprs;
+ }
+
+ @Override
+ protected void toString(PrintStream out) {
+ out.print("Tuple");
+ String sep = "(";
+ for (Iterator<ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+ out.printf(sep);
+ it.next().toString(out);
+ sep = ", ";
+ }
+
+ out.print(")");
+ }
+
+ /**
+ * Make a tuple expression
+ *
+ * @param exprs
+ * children expressions
+ * @return a comparator expression
+ */
+ public static ComparatorExpr makeTupleExpr(ComparatorExpr... exprs) {
+ if (exprs.length == 0)
+ throw new IllegalArgumentException("Zero-length expression list");
+ if (exprs.length == 1) return exprs[0];
+ List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.length);
+ for (ComparatorExpr e : exprs) {
+ if (e instanceof TupleExpr) {
+ aggr.addAll(((TupleExpr) e).childrenExpr());
+ } else {
+ aggr.add(e);
+ }
+ }
+ return new TupleExpr(aggr);
+ }
+
+ /**
+ * Make a tuple expression
+ *
+ * @param exprs
+ * children expressions
+ * @return a comparator expression
+ */
+ public static ComparatorExpr makeTupleComparator(
+ Collection<? extends ComparatorExpr> exprs) {
+ if (exprs.size() == 0)
+ throw new IllegalArgumentException("Zero-length expression list");
+ if (exprs.size() == 1) return exprs.iterator().next();
+ List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.size());
+ for (Iterator<? extends ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+ ComparatorExpr e = it.next();
+ if (e instanceof TupleExpr) {
+ aggr.addAll(((TupleExpr) e).childrenExpr());
+ } else {
+ aggr.add(e);
+ }
+ }
+ return new TupleExpr(aggr);
+ }
+
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Utilities to allow PIG Storer to generate keys for sorted Zebra tables
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Implementation of PIG Storer/Loader Interfaces
+ */
+package org.apache.hadoop.zebra.pig;
+
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java Thu Nov 5 21:02:57 2009
@@ -26,57 +26,93 @@
import org.apache.hadoop.io.file.tfile.Utils;
import org.apache.pig.data.DataType;
+/**
+ * Zebra Column Type
+ */
public enum ColumnType {
+ /**
+ * Any type
+ */
ANY("any") {
public byte pigDataType() {
return DataType.UNKNOWN;
}
},
+ /**
+ * Integer
+ */
INT("int") {
public byte pigDataType() {
return DataType.INTEGER;
}
},
+ /**
+ * Long
+ */
LONG("long") {
public byte pigDataType() {
return DataType.LONG;
}
},
+ /**
+ * Float
+ */
FLOAT("float") {
public byte pigDataType() {
return DataType.FLOAT;
}
},
+ /**
+ * Double
+ */
DOUBLE("double") {
public byte pigDataType() {
return DataType.DOUBLE;
}
},
+ /**
+ * Boolean
+ */
BOOL("bool") {
public byte pigDataType() {
return DataType.BOOLEAN;
}
},
+ /**
+ * Collection
+ */
COLLECTION("collection") {
public byte pigDataType() {
return DataType.BAG;
}
},
+ /**
+ * Map
+ */
MAP("map") {
public byte pigDataType() {
return DataType.MAP;
}
},
+ /**
+ * Record
+ */
RECORD("record") {
public byte pigDataType() {
return DataType.TUPLE;
}
},
+ /**
+ * String
+ */
STRING("string") {
public byte pigDataType() {
return DataType.CHARARRAY;
}
},
+ /**
+ * Bytes
+ */
BYTES("bytes") {
public byte pigDataType() {
return DataType.BYTEARRAY;
@@ -102,6 +138,13 @@
return ColumnType.valueOf(name.toUpperCase());
}
+ /**
+ * Get the Zebra type from a Pig type
+ *
+ * @param dt Pig type
+ *
+ * @return Zebra type
+ */
public static ColumnType getTypeByPigDataType(byte dt) {
for (ColumnType ct : ColumnType.values()) {
if (ct.pigDataType() == dt) {
@@ -111,18 +154,40 @@
return null;
}
+ /**
+ * Get the Zebra type name for the passed in Zebra column type
+ *
+ * @param columntype Zebra column type
+ *
+ * @return string representation of the Zebra column type
+ */
public static String findTypeName(ColumnType columntype) {
return columntype.getName();
}
+ /**
+ * Get the Zebra type name for this Zebra column type
+ *
+ * @return string representation of the Zebra column type
+ */
public String getName() {
return name;
}
+ /**
+ * Same as getType
+ */
public String toString() {
return name;
}
+ /**
+ * check if a column type contains internal schema
+ *
+ * @param columnType Zebra column type
+ *
+ * @return true if the type is RECORD, MAP or COLLECTION
+ */
public static boolean isSchemaType(ColumnType columnType) {
return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Thu Nov 5 21:02:57 2009
@@ -37,6 +37,9 @@
private static final long schemaVersion = 1L;
+ /**
+ * Column Schema in Schema
+ */
public static class ColumnSchema {
private String name;
private ColumnType type;
@@ -73,6 +76,8 @@
/**
* access function to get the column name
+ *
+ * @return name of the column
*/
public String getName() {
return name;
@@ -80,6 +85,8 @@
/**
* access function to get the column type
+ *
+ * @return column type
*/
public ColumnType getType() {
return type;
@@ -87,6 +94,8 @@
/**
* access function to get the column name
+ *
+ * @return column index in the parent schema
*/
public int getIndex() {
return index;
@@ -128,8 +137,8 @@
/**
* Compare two field schema for equality
*
- * @param fschema
- * @param fother
+ * @param fschema one column schema to be compared
+ * @param fother the other column schema to be compared
* @return true if ColumnSchema are equal, false otherwise
*/
public static boolean equals(ColumnSchema fschema, ColumnSchema fother) {
@@ -195,28 +204,47 @@
/*
* Returns the schema for the next level structure, in record, collection
* and map.
+ *
+ * @return Schema of the column
*/
public Schema getSchema() {
return schema;
}
}
- /*
- * helper class to parse a column name string one section at a time and find
+ /**
+ * Helper class to parse a column name string one section at a time and find
* the required type for the parsed part.
*/
public static class ParsedName {
- public String mName;
- int mKeyOffset; // the offset where the keysstring starts
- public ColumnType mDT = ColumnType.ANY; // parent's type
+ private String mName;
+ private int mKeyOffset; // the offset where the keysstring starts
+ private ColumnType mDT = ColumnType.ANY; // parent's type
+ /**
+ * Default ctor
+ */
public ParsedName() {
}
+ /**
+ * Set the name
+ *
+ * @param name
+ * column name string
+ */
public void setName(String name) {
mName = name;
}
+ /**
+ * Set the name and type
+ *
+ * @param name
+ * column name string
+ * @param pdt
+ * column type
+ */
public void setName(String name, ColumnType pdt) {
mName = name;
mDT = pdt;
@@ -227,21 +255,42 @@
mKeyOffset = keyStrOffset;
}
- void setDT(ColumnType dt) {
+ /**
+ * Set the column type
+ *
+ * @param dt
+ * column type to be set with
+ */
+ public void setDT(ColumnType dt) {
mDT = dt;
}
- ColumnType getDT() {
+ /**
+ * Get the column type
+ *
+ * @return column type
+ */
+ public ColumnType getDT() {
return mDT;
}
- String getName() {
+ /**
+ * Get the column name
+ *
+ * @return column name
+ */
+ public String getName() {
return mName;
}
+ /**
+ * Parse one sector of a fully qualified column name; also checks validity
+ * of use of the MAP and RECORD delimiters
+ *
+ * @param fs
+ * column schema this column name is checked against with
+ */
public String parseName(Schema.ColumnSchema fs) throws ParseException {
- // parse one sector of a fq name, also checks sanity of MAP and RECORD
- // fields
int fieldIndex, hashIndex;
fieldIndex = mName.indexOf('.');
hashIndex = mName.indexOf('#');
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Zebra Schema
+ */
+package org.apache.hadoop.zebra.schema;
+
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Thu Nov 5 21:02:57 2009
@@ -50,7 +50,6 @@
// tmp schema file name, used as a flag of unfinished CG
private final static String SCHEMA_FILE = ".schema";
- private final static String DEFAULT_COMPARATOR = "memcmp";
// schema version, should be same as BasicTable's most of the time
private final static Version SCHEMA_VERSION =
new Version((short) 1, (short) 1);
@@ -87,19 +86,18 @@
this.version = SCHEMA_VERSION;
}
- public CGSchema(Schema schema, boolean sorted) {
+ public CGSchema(Schema schema, boolean sorted, String comparator) {
this.sorted = sorted;
- this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+ this.comparator = (sorted) ? (comparator == null ? SortInfo.DEFAULT_COMPARATOR : comparator) : "";
this.schema = schema;
this.version = SCHEMA_VERSION;
}
- public CGSchema(Schema schema, boolean sorted, String name, String serializer, String compressor, String owner, String group, short perm) {
- this(schema, sorted);
+ public CGSchema(Schema schema, boolean sorted, String comparator, String name, String serializer, String compressor, String owner, String group, short perm) {
+ this(schema, sorted, comparator);
this.name = name;
this.serializer = serializer;
this.compressor = compressor;
-// this.owner = owner;
this.group = group;
this.perm = perm;
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Thu Nov 5 21:02:57 2009
@@ -41,6 +41,9 @@
* insertions and queries respectively.
*/
public class Partition {
+ /**
+ * Storage split types
+ */
public enum SplitType {
NONE, RECORD, COLLECTION, MAP
}
@@ -317,7 +320,7 @@
*/
public CGSchema generateDefaultCGSchema(String name, String compressor,
String serializer, String owner, String group,
- short perm, final int defaultCGIndex) throws ParseException {
+ short perm, final int defaultCGIndex, String comparator) throws ParseException {
Schema schema = new Schema();
Schema.ColumnSchema fs;
for (int i = 0; i < mSchema.getNumColumns(); i++) {
@@ -369,13 +372,13 @@
}
}
CGSchema defaultSchema =
- (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, name, serializer, compressor, owner, group, perm));
+ (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, comparator, name, serializer, compressor, owner, group, perm));
return defaultSchema;
}
/**
* returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
- * aross different hash keys
+ * across different hash keys
*/
public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
Schema.ColumnSchema fs) {
@@ -661,17 +664,21 @@
private Projection mProjection = null;
private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+ private String comparator;
+ private boolean mSorted;
+ private SortInfo mSortInfo;
/*
* ctor used for LOAD
*/
- public Partition(Schema schema, Projection projection, String storage)
+ public Partition(Schema schema, Projection projection, String storage, String comparator)
throws ParseException, IOException {
mSchema = schema;
TableStorageParser sparser =
- new TableStorageParser(new StringReader(storage), this, mSchema);
+ new TableStorageParser(new StringReader(storage), this, mSchema, comparator);
mPartitionInfo = new PartitionInfo(schema);
- ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+ sparser.StorageSchema(cgschemas);
mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
mProjection = projection;
Schema projSchema = projection.getProjectionSchema();
@@ -747,15 +754,34 @@
/*
* ctor used by STORE
*/
- public Partition(final String schema, final String storage)
+ public Partition(final String schema, final String storage, String comparator, String sortColumns)
+ throws ParseException, IOException
+ {
+ TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+ mSchema = parser.RecordSchema(null);
+ mSortInfo = SortInfo.parse(sortColumns, mSchema, comparator);
+ mSorted = (mSortInfo != null && mSortInfo.size() > 0);
+ this.comparator = (mSorted ? mSortInfo.getComparator() : "");
+ storeConst(storage);
+ }
+
+ public Partition(String schema, final String storage, String comparator)
throws ParseException, IOException
{
TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
mSchema = parser.RecordSchema(null);
+ this.comparator = comparator;
+ storeConst(storage);
+ }
+
+ private void storeConst(final String storage)
+ throws ParseException, IOException
+ {
mPartitionInfo = new PartitionInfo(mSchema);
TableStorageParser sparser =
- new TableStorageParser(new StringReader(storage), this, mSchema);
- ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ new TableStorageParser(new StringReader(storage), this, mSchema, this.comparator);
+ ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+ sparser.StorageSchema(cgschemas);
mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
int size = mSchema.getNumColumns();
PartitionInfo.ColumnMappingEntry cgindex;
@@ -805,6 +831,18 @@
mPCNeedMap.get(i).createMap();
}
+ public SortInfo getSortInfo() {
+ return mSortInfo;
+ }
+
+ public boolean isSorted() {
+ return mSorted;
+ }
+
+ public String getComparator() {
+ return comparator;
+ }
+
/**
* returns table schema
*/
@@ -856,10 +894,10 @@
{
if (projectedKeys != null)
{
- pn.mDT = ColumnType.MAP;
+ pn.setDT(ColumnType.MAP);
map = true;
} else {
- pn.mDT = ColumnType.ANY;
+ pn.setDT(ColumnType.ANY);
PartitionInfo.ColumnMappingEntry cme;
for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
{
@@ -985,7 +1023,7 @@
pn.parseName(fs);
Schema.ParsedName oripn = new Schema.ParsedName();
for (int i = 0; i < schema.getNumColumns(); i++) {
- oripn.setName(new String(pn.mName), pn.mDT);
+ oripn.setName(new String(pn.getName()), pn.getDT());
child = schema.getColumn(i);
if (getCGIndex(child) == null) {
// not a CG: go one level lower
@@ -1038,7 +1076,7 @@
PartitionedColumn parent, Schema.ColumnSchema child, int i,
int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
CGEntry cgentry;
- if (pn.mDT == ColumnType.ANY) {
+ if (pn.getDT() == ColumnType.ANY) {
// this subtype is MAP split and the projection is on the whole MAP:
// => need to add stitches for all split keys
@@ -1236,7 +1274,6 @@
while (it.hasNext())
it.next().getValue().read();
- TypesUtils.resetTuple(t);
// dispatch
mExecs.get(mStitchSize - 1).setRecord(t);
@@ -1313,9 +1350,9 @@
public CGSchema generateDefaultCGSchema(String name, String compressor, String serializer,
String owner, String group, short perm,
- final int defaultCGIndex) throws ParseException {
+ final int defaultCGIndex, String comparator) throws ParseException {
return mPartitionInfo.generateDefaultCGSchema(name, compressor, serializer,owner, group, perm,
- defaultCGIndex);
+ defaultCGIndex, comparator);
}
public void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+/**
+ * Sortness related Information
+ */
+public class SortInfo {
+ public static final String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
+ private boolean global = true;
+ private String[] columns;
+ private int[] indices = null;
+ private ColumnType[] types = null;
+ private String comparator = null;
+ private Schema schema = null;
+ private static final String SORTED_COLUMN_DELIMITER = ",";
+
+ private SortInfo(String[] columns, int[] sortIndices, ColumnType[] sortColTypes, String comparator, Schema schema)
+ {
+ this.columns = columns;
+ this.indices = sortIndices;
+ this.comparator = comparator;
+ this.schema = schema;
+ this.types = sortColTypes;
+ }
+
+ /**
+ * Get an array of the sorted column names with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public String[] getSortColumnNames() {
+ return columns;
+ }
+
+ /**
+ * Get an array of zebra types of the sorted columns with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public ColumnType[] getSortColumnTypes() {
+ return types;
+ }
+
+ /**
+ * Get an array of column indices in schema of the sorted columns with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public int[] getSortIndices() {
+ return indices;
+ }
+
+ /**
+ * Get the number of sorted columns
+ *
+ * @return number of sorted columns
+ */
+ public int size() {
+ return (columns == null ? 0 : columns.length);
+ }
+
+ /**
+ * Get the comparator name
+ *
+ * @return comparator name
+ */
+ public String getComparator() {
+ return comparator;
+ }
+
+ /**
+ * Check if the two SortInfo objects are equal
+ *
+ * @return true if one's sort columns is equal to a leading portion of the other's
+ */
+ public boolean equals(String sortcolumns, String comparator) throws IOException {
+ if (sortcolumns == null || sortcolumns.trim().isEmpty())
+ {
+ return false;
+ }
+ String[] columns = sortcolumns.trim().split(SORTED_COLUMN_DELIMITER);
+ for (String column : columns)
+ {
+ if (schema.getColumn(column) == null)
+ throw new IOException(column + " does not exist in schema");
+ }
+ if (this.columns.length <= columns.length)
+ {
+ for (int i = 0; i < this.columns.length; i++)
+ {
+ if (!this.columns[i].equals(columns[i]))
+ return false;
+ }
+ } else {
+ for (int i = 0; i < columns.length; i++)
+ {
+ if (!columns[i].equals(this.columns[i]))
+ return false;
+ }
+ }
+ if (this.comparator == null && comparator == null)
+ {
+ return true;
+ } else if (this.comparator != null && comparator != null)
+ {
+ return (this.comparator.equals(comparator));
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Build a SortInfo object from sort column names, schema, and comparator
+ *
+ * @param sortStr
+ * comma-separated sort column names
+ * @param schema
+ * schema of the Zebra table for the sort columns
+ * @param comparator
+ * comparator name
+ * @return newly built SortInfo object
+ */
+ public static SortInfo parse(String sortStr, Schema schema, String comparator) throws IOException
+ {
+ if (sortStr == null || sortStr.trim().isEmpty())
+ {
+ return null;
+ }
+ String[] sortedColumns = sortStr.trim().split(SORTED_COLUMN_DELIMITER);
+ int[] sortColIndices = new int[sortedColumns.length];
+ ColumnType[] sortColTypes = new ColumnType[sortedColumns.length];
+ Schema.ColumnSchema cs;
+ for (int i = 0; i < sortedColumns.length; i++)
+ {
+ sortedColumns[i] = sortedColumns[i].trim();
+ /*
+ * sanity check the sort column's existence
+ */
+ if ((cs = schema.getColumn(sortedColumns[i])) == null)
+ throw new IOException(sortedColumns[i] + " does not exist in schema");
+ sortColIndices[i] = schema.getColumnIndex(sortedColumns[i]);
+ sortColTypes[i] = schema.getColumn(sortedColumns[i]).getType();
+ }
+ String comparatorInUse = (sortedColumns.length > 0 ?
+ (comparator == null || comparator.isEmpty() ?
+ DEFAULT_COMPARATOR : comparator) : null);
+ return new SortInfo(sortedColumns, sortColIndices, sortColTypes, comparatorInUse, schema);
+ }
+
+ /**
+ * Build a string of comma-separated sort column names from an array of sort column names
+ *
+ * @param names an array of sort column names
+ *
+ * @return a string of comma-separated sort column names
+ */
+ public static String toSortString(String[] names)
+ {
+ if (names == null || names.length == 0)
+ return null;
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < names.length; i++)
+ {
+ if (i > 0)
+ sb.append(SORTED_COLUMN_DELIMITER);
+ sb.append(names[i]);
+ }
+ return sb.toString();
+ }
+}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java Thu Nov 5 21:02:57 2009
@@ -30,14 +30,14 @@
import org.apache.hadoop.zebra.parser.ParseException;
+/**
+ * This class extracts a subfield from a column or subcolumn stored
+ * in entirety on disk
+ * It should be used only by readers whose serializers do not
+ * support projection
+ */
public class SubColumnExtraction {
- /**
- * This class extracts a subfield from a column or subcolumn stored
- * in entirety on disk
- * It should be used only by readers whose serializers do not
- * support projection
- */
- public static class SubColumn {
+ static class SubColumn {
Schema physical;
Projection projection;
ArrayList<SplitColumn> exec = null;
@@ -77,16 +77,17 @@
pn.setName(name);
fs = physical.getColumnSchema(pn);
if (keySet != null)
- pn.mDT = ColumnType.MAP;
+ pn.setDT(ColumnType.MAP);
if (fs == null)
continue; // skip non-existing field
j = fs.getIndex();
- if (pn.mDT == ColumnType.MAP || pn.mDT == ColumnType.RECORD || pn.mDT == ColumnType.COLLECTION)
+ ColumnType ct = pn.getDT();
+ if (ct == ColumnType.MAP || ct == ColumnType.RECORD || ct == ColumnType.COLLECTION)
{
// record/map subfield is expected
- sc = new SplitColumn(j, pn.mDT);
- if (pn.mDT == ColumnType.MAP)
+ sc = new SplitColumn(j, ct);
+ if (ct == ColumnType.MAP)
sclist.add(sc);
exec.add(sc); // breadth-first
// (i, j) represents the mapping between projection and physical schema
@@ -105,18 +106,19 @@
* build the split executions
*/
private void buildSplit(SplitColumn parent, Schema.ColumnSchema fs,
- Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
+ final Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
{
// recursive call to get the next level schema
- if (pn.mDT != fs.getType())
+ ColumnType ct = pn.getDT();
+ if (ct != fs.getType())
throw new ParseException(fs.getName()+" is not of proper type.");
String prefix;
int fieldIndex;
SplitColumn sc;
- Partition.SplitType callerDT = (pn.mDT == ColumnType.MAP ? Partition.SplitType.MAP :
- (pn.mDT == ColumnType.RECORD ? Partition.SplitType.RECORD :
- (pn.mDT == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
+ Partition.SplitType callerDT = (ct == ColumnType.MAP ? Partition.SplitType.MAP :
+ (ct == ColumnType.RECORD ? Partition.SplitType.RECORD :
+ (ct == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
Partition.SplitType.NONE)));
prefix = pn.parseName(fs);
Schema schema = fs.getSchema();
@@ -133,11 +135,12 @@
fieldIndex = 0;
}
- if (pn.mDT != ColumnType.ANY)
+ ct = pn.getDT();
+ if (ct != ColumnType.ANY)
{
// record subfield is expected
- sc = new SplitColumn(fieldIndex, pn.mDT);
- if (pn.mDT == ColumnType.MAP)
+ sc = new SplitColumn(fieldIndex, ct);
+ if (ct == ColumnType.MAP)
sclist.add(sc);
exec.add(sc); // breadth-first
buildSplit(sc, fs, pn, projIndex, null);
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt Thu Nov 5 21:02:57 2009
@@ -30,7 +30,7 @@
import org.apache.hadoop.zebra.types.*;
public class TableStorageParser {
- public TableStorageParser(java.io.Reader in, Partition partition, Schema schema) { this(in); mSchema = schema; this.partition = partition;}
+ public TableStorageParser(java.io.Reader in, Partition partition, Schema schema, String comparator) { this(in); mSchema = schema; this.partition = partition; this.comparator = comparator; }
private Schema mSchema;
private int mDefaultCGIndex = -1;
private String mName = null;
@@ -39,6 +39,7 @@
private short mPerm = -1;
private int mCGCount = 0;
private Partition partition;
+ private String comparator = null;
}
PARSER_END(TableStorageParser)
@@ -54,9 +55,10 @@
TOKEN : { <COMPRESSOR : "lzo" | "gz"> }
TOKEN : { <SERIALIZER : ("pig" | "avro")> }
-TOKEN : { <ORDER : "order by"> }
TOKEN : { <COMPRESS : "compress by"> }
TOKEN : { <SERIALIZE : "serialize by"> }
+TOKEN : { <ASC : "ASC"> }
+TOKEN : { <DESC: "DESC"> }
TOKEN : { <SECURE : "secure by"> }
TOKEN : { <USER : "user"> }
TOKEN : { <GROUP : "group"> }
@@ -64,8 +66,6 @@
TOKEN : { <AS : "as"> }
-
-
TOKEN:
{
<#LETTER : ["a"-"z", "A"-"Z"] >
@@ -77,62 +77,17 @@
| <SHORT : (<OCTAL>){3} >
}
-ArrayList<CGSchema> StorageSchema() throws ParseException :
+void StorageSchema(ArrayList<CGSchema> s) throws ParseException :
{
- ArrayList<CGSchema> s = new ArrayList();
CGSchema fs;
CGSchema defaultSchema;
}
{
try {
- LOOKAHEAD(2) <EOF>
- {
- defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, 0);
- if (defaultSchema != null)
- s.add(defaultSchema);
-
- // check column group names, add system created names when necessary;
- HashSet<String> cgNames = new HashSet<String>();
- ArrayList<CGSchema> unnamed = new ArrayList<CGSchema>();
- for (int i = 0; i < s.size(); i++) {
- CGSchema cgSchema = s.get(i);
- String str = cgSchema.getName();
- if (str != null) {
- if (!cgNames.add(str)) {
- throw new ParseException("Duplicate column group names.");
- }
- } else {
- unnamed.add(cgSchema);
- }
- }
-
- int digits = 1;
- int total = unnamed.size();
- while (total >= 10) {
- ++digits;
- total /= 10;
- }
- String formatString = "%0" + digits + "d";
-
- int idx = 0;
- for (int i = 0; i < unnamed.size(); i++) {
- CGSchema cgSchema = unnamed.get(i);
- String str = null;
- while (true) {
- str = "CG" + String.format(formatString, idx++);
- if (!cgNames.contains(str)) {
- break;
- }
- }
- cgSchema.setName(str);
- }
- return s;
- }
- |
fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);}
(";" fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);})* <EOF>
{
- defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex);
+ defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex, comparator);
if (defaultSchema != null)
s.add(defaultSchema);
@@ -171,13 +126,29 @@
}
cgSchema.setName(str);
}
- return s;
+ return;
}
} catch (TokenMgrError e) {
throw new ParseException(e.getMessage());
}
}
+boolean ascdsc() throws ParseException :
+{
+ Token t1 = null, t2 = null;
+}
+{
+ (
+ t1 = <ASC>
+ | t2 = <DESC>
+ )
+ {
+ if (t2 != null)
+ return false;
+ return true;
+ }
+}
+
CGSchema FieldSchema() throws ParseException:
{
Token t1 = null, t2 = null;
@@ -310,8 +281,9 @@
mOwner = owner;
mGroup = group;
mPerm = perm;
- } else
- cs = new CGSchema(fs, false, name, serializer, compressor, owner, group, perm);
+ } else {
+ cs = new CGSchema(fs, false, comparator, name, serializer, compressor, owner, group, perm);
+ }
return cs;
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java Thu Nov 5 21:02:57 2009
@@ -212,8 +212,8 @@
* @throws IOException
*
*/
- public static void formatTuple(Tuple tuple, String projection) throws IOException {
- Tuple one = createTuple(Projection.getNumColumns(projection));
+ public static void formatTuple(Tuple tuple, int ncols) throws IOException {
+ Tuple one = createTuple(ncols);
tuple.reference(one);
return;
/*
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Thu Nov 5 21:02:57 2009
@@ -81,14 +81,13 @@
return String.format("%s%09d", prefix, random.nextInt(max));
}
- static int createBasicTable(int parts, int rows, String strSchema, String storage,
- Path path, boolean properClose, boolean sorted) throws IOException {
+ static int createBasicTable(int parts, int rows, String strSchema, String storage, String sortColumns,
+ Path path, boolean properClose) throws IOException {
if (fs.exists(path)) {
BasicTable.drop(path, conf);
}
- BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage,
- sorted, conf);
+ BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage, sortColumns, null, conf);
writer.finish();
int total = 0;
@@ -96,6 +95,7 @@
String colNames[] = schema.getColumns();
Tuple tuple = TypesUtils.createTuple(schema);
+ boolean sorted = writer.isSorted();
for (int i = 0; i < parts; ++i) {
writer = new BasicTable.Writer(path, conf);
TableInserter inserter = writer.getInserter(
@@ -230,10 +230,10 @@
}
static void doReadWrite(Path path, int parts, int rows, String schema,
- String storage, String projection, boolean properClose, boolean sorted)
+ String storage, String sortColumns, String projection, boolean properClose, boolean sorted)
throws IOException, ParseException {
- int totalRows = createBasicTable(parts, rows, schema, storage, path,
- properClose, sorted);
+ int totalRows = createBasicTable(parts, rows, schema, storage, sortColumns, path,
+ properClose);
if (rows == 0) {
Assert.assertEquals(rows, 0);
}
@@ -248,17 +248,17 @@
public void testMultiCGs() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableMultiCGs");
- doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", "SF_f,SF_a,SF_c,SF_d", true, false);
+ doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", null, "SF_f,SF_a,SF_c,SF_d", true, false);
}
public void testCornerCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableCornerCases");
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, true);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
+ doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 0, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
+ doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 2, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
}
static int doReadOnly(TableScanner scanner) throws IOException, ParseException {
@@ -289,7 +289,7 @@
@Test
public void testNullSplits() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableNullSplits");
- int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+ int totalRows = createBasicTable(2, 250, "a, b, c", "", "a", path, true);
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
reader.setProjection("a,d,c,f");
Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
@@ -301,14 +301,14 @@
@Test
public void testNegativeSplits() throws IOException, ParseException {
Path path = new Path(rootPath, "TestNegativeSplits");
- int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+ int totalRows = createBasicTable(2, 250, "a, b, c", "", "", path, true);
rangeSplitBasicTable(-1, totalRows, "a,d,c,f", path);
}
@Test
public void testMetaBlocks() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableMetaBlocks");
- createBasicTable(3, 100, "a, b, c", "", path, false, false);
+ createBasicTable(3, 100, "a, b, c", "", null, path, false);
BasicTable.Writer writer = new BasicTable.Writer(path, conf);
BytesWritable meta1 = makeKey(1234);
BytesWritable meta2 = makeKey(9876);
@@ -350,8 +350,8 @@
@Test
public void testNormalCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableNormal");
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, true);
+ doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java Thu Nov 5 21:02:57 2009
@@ -72,7 +72,7 @@
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();