You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [4/23] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebra...
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Generate partial binary key for a leaf expression.
+ */
+public final class LeafGenerator {
+ final LeafExpr leaf; // may be null to represent adding '0'
+ final int escapeLevel;
+ final int comescLevel;
+ final boolean complement;
+
+ /**
+ * @param leaf
+ * The leaf expression
+ * @param el
+ * escape level
+ * @param cel
+ * complement escape level
+ * @param complement
+ * complement the value
+ */
+ public LeafGenerator(LeafExpr leaf, int el, int cel, boolean complement) {
+ this.leaf = leaf;
+ this.escapeLevel = el;
+ if(escapeLevel > 1) {
+ System.out.println("gaurav");
+ }
+ this.comescLevel = cel;
+ this.complement = complement;
+ }
+
+ void append(EncodingOutputStream out, Tuple tuple) throws ExecException {
+ out.setEscapeParams(escapeLevel, comescLevel, complement);
+ if (leaf == null) { // add a '\0'
+ out.write(0x0);
+ } else {
+ leaf.append(out, tuple);
+ }
+ }
+
+ void illustrate(PrintStream out) {
+ if (leaf == null) {
+ out.printf("(%s, NA, %d, %d, %b)", "NULL", escapeLevel, comescLevel,
+ complement);
+ } else {
+ leaf.illustrate(out, escapeLevel, comescLevel, complement);
+ }
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class LongExpr extends FixedLengthPrimitive {
+ public LongExpr(int index) {
+ super(index, Long.SIZE / Byte.SIZE);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Long.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ long l = (Long) o;
+ l ^= 1L << (Long.SIZE - 1);
+ b[7] = (byte) l;
+ b[6] = (byte) (l >> 8);
+ b[5] = (byte) (l >> 16);
+ b[4] = (byte) (l >> 24);
+ b[3] = (byte) (l >> 32);
+ b[2] = (byte) (l >> 40);
+ b[1] = (byte) (l >> 48);
+ b[0] = (byte) (l >> 56);
+ }
+
+ @Override
+ protected String getType() {
+ return "Long";
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Negate expression
+ */
+public class NegateExpr extends ComparatorExpr {
+ protected final ComparatorExpr expr;
+
+ /**
+ * constructor
+ *
+ * @param expr
+ */
+ protected NegateExpr(ComparatorExpr expr) {
+ this.expr = expr;
+ }
+
+ @Override
+ protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+ boolean c, boolean explicitBound) {
+ expr.appendLeafGenerator(list, cel, el, !c, true);
+ }
+
+ /**
+ * @return the child expression
+ */
+ public ComparatorExpr childExpr() {
+ return expr;
+ }
+
+ @Override
+ protected void toString(PrintStream out) {
+ out.print("Negate(");
+ expr.toString(out);
+ out.print(")");
+ }
+
+ /**
+ * Make a negate expression
+ */
+ public static ComparatorExpr makeNegateExpr(ComparatorExpr expr) {
+ if (expr instanceof NegateExpr) {
+ NegateExpr negateExpr = (NegateExpr) expr;
+ return negateExpr.childExpr();
+ }
+
+ return new NegateExpr(expr);
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class ShortExpr extends FixedLengthPrimitive {
+ public ShortExpr(int index) {
+ super(index, Short.SIZE / Byte.SIZE);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != Short.SIZE / Byte.SIZE) {
+ throw new IllegalArgumentException("Incorrect buffer size");
+ }
+ short l = (Short) o;
+ l ^= 1 << (Short.SIZE - 1);
+ b[1] = (byte) l;
+ b[0] = (byte) (l >> 8);
+ }
+
+ @Override
+ protected String getType() {
+ return "Short";
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+public final class StringExpr extends LeafExpr {
+ public StringExpr(int index) {
+ super(index);
+ }
+
+ @Override
+ protected void appendObject(EncodingOutputStream out, Object o) {
+ String s = (String) o;
+ try {
+ ByteBuffer bytes = Text.encode(s);
+ out.write(bytes.array(), 0, bytes.limit());
+ } catch (IOException e) {
+ throw new RuntimeException("Conversion error", e);
+ }
+ }
+
+ @Override
+ protected boolean implicitBound() {
+ return true;
+ }
+
+ @Override
+ protected String getType() {
+ return "String";
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class TupleExpr extends ComparatorExpr {
+ protected final List<ComparatorExpr> exprs;
+
+ protected TupleExpr(List<ComparatorExpr> exprs) {
+ this.exprs = exprs;
+ }
+
+ @Override
+ protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+ boolean c, boolean explicitBound) {
+ int length = exprs.size();
+ for (int i = 0; i < length - 1; ++i) {
+ ComparatorExpr e = exprs.get(i);
+ e.appendLeafGenerator(list, el, cel, c, true);
+ }
+ exprs.get(length - 1).appendLeafGenerator(list, el, cel, c, explicitBound);
+ }
+
+ /**
+ * Get the children expressions.
+ *
+ * @return The children expressions.
+ */
+ public List<ComparatorExpr> childrenExpr() {
+ return exprs;
+ }
+
+ @Override
+ protected void toString(PrintStream out) {
+ out.print("Tuple");
+ String sep = "(";
+ for (Iterator<ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+ out.printf(sep);
+ it.next().toString(out);
+ sep = ", ";
+ }
+
+ out.print(")");
+ }
+
+ /**
+ * Make a tuple expression
+ *
+ * @param exprs
+ * children expressions
+ * @return a comparator expression
+ */
+ public static ComparatorExpr makeTupleExpr(ComparatorExpr... exprs) {
+ if (exprs.length == 0)
+ throw new IllegalArgumentException("Zero-length expression list");
+ if (exprs.length == 1) return exprs[0];
+ List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.length);
+ for (ComparatorExpr e : exprs) {
+ if (e instanceof TupleExpr) {
+ aggr.addAll(((TupleExpr) e).childrenExpr());
+ } else {
+ aggr.add(e);
+ }
+ }
+ return new TupleExpr(aggr);
+ }
+
+ /**
+ * Make a tuple expression
+ *
+ * @param exprs
+ * children expressions
+ * @return a comparator expression
+ */
+ public static ComparatorExpr makeTupleComparator(
+ Collection<? extends ComparatorExpr> exprs) {
+ if (exprs.size() == 0)
+ throw new IllegalArgumentException("Zero-length expression list");
+ if (exprs.size() == 1) return exprs.iterator().next();
+ List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.size());
+ for (Iterator<? extends ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+ ComparatorExpr e = it.next();
+ if (e instanceof TupleExpr) {
+ aggr.addAll(((TupleExpr) e).childrenExpr());
+ } else {
+ aggr.add(e);
+ }
+ }
+ return new TupleExpr(aggr);
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Utilities to allow PIG Storer to generate keys for sorted Zebra tables
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Implementation of PIG Storer/Loader Interfaces
+ */
+package org.apache.hadoop.zebra.pig;
+
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java Tue Nov 24 19:54:19 2009
@@ -23,60 +23,96 @@
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
import org.apache.pig.data.DataType;
+/**
+ * Zebra Column Type
+ */
public enum ColumnType {
+ /**
+ * Any type
+ */
ANY("any") {
public byte pigDataType() {
return DataType.UNKNOWN;
}
},
+ /**
+ * Integer
+ */
INT("int") {
public byte pigDataType() {
return DataType.INTEGER;
}
},
+ /**
+ * Long
+ */
LONG("long") {
public byte pigDataType() {
return DataType.LONG;
}
},
+ /**
+ * Float
+ */
FLOAT("float") {
public byte pigDataType() {
return DataType.FLOAT;
}
},
+ /**
+ * Double
+ */
DOUBLE("double") {
public byte pigDataType() {
return DataType.DOUBLE;
}
},
+ /**
+ * Boolean
+ */
BOOL("bool") {
public byte pigDataType() {
return DataType.BOOLEAN;
}
},
+ /**
+ * Collection
+ */
COLLECTION("collection") {
public byte pigDataType() {
return DataType.BAG;
}
},
+ /**
+ * Map
+ */
MAP("map") {
public byte pigDataType() {
return DataType.MAP;
}
},
+ /**
+ * Record
+ */
RECORD("record") {
public byte pigDataType() {
return DataType.TUPLE;
}
},
+ /**
+ * String
+ */
STRING("string") {
public byte pigDataType() {
return DataType.CHARARRAY;
}
},
+ /**
+ * Bytes
+ */
BYTES("bytes") {
public byte pigDataType() {
return DataType.BYTEARRAY;
@@ -102,6 +138,13 @@
return ColumnType.valueOf(name.toUpperCase());
}
+ /**
+ * Get the Zebra type from a Pig type
+ *
+ * @param dt Pig type
+ *
+ * @return Zebra type
+ */
public static ColumnType getTypeByPigDataType(byte dt) {
for (ColumnType ct : ColumnType.values()) {
if (ct.pigDataType() == dt) {
@@ -111,18 +154,40 @@
return null;
}
+ /**
+ * Get the Zebra type name for the passed in Zebra column type
+ *
+ * @param columntype Zebra column type
+ *
+ * @return string representation of the Zebra column type
+ */
public static String findTypeName(ColumnType columntype) {
return columntype.getName();
}
+ /**
+ * Get the Zebra type name for this Zebra column type
+ *
+ * @return string representation of the Zebra column type
+ */
public String getName() {
return name;
}
+ /**
+ * Same as getType
+ */
public String toString() {
return name;
}
+ /**
+ * check if a column type contains internal schema
+ *
+ * @param columnType Zebra column type
+ *
+ * @return true if the type is RECORD, MAP or COLLECTION
+ */
public static boolean isSchemaType(ColumnType columnType) {
return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
import java.util.HashSet;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.parser.TableSchemaParser;
@@ -37,6 +38,9 @@
private static final long schemaVersion = 1L;
+ /**
+ * Column Schema in Schema
+ */
public static class ColumnSchema {
private String name;
private ColumnType type;
@@ -73,6 +77,8 @@
/**
* access function to get the column name
+ *
+ * @return name of the column
*/
public String getName() {
return name;
@@ -80,6 +86,8 @@
/**
* access function to get the column type
+ *
+ * @return column type
*/
public ColumnType getType() {
return type;
@@ -87,6 +95,8 @@
/**
* access function to get the column name
+ *
+ * @return column index in the parent schema
*/
public int getIndex() {
return index;
@@ -128,8 +138,8 @@
/**
* Compare two field schema for equality
*
- * @param fschema
- * @param fother
+ * @param fschema one column schema to be compared
+ * @param fother the other column schema to be compared
* @return true if ColumnSchema are equal, false otherwise
*/
public static boolean equals(ColumnSchema fschema, ColumnSchema fother) {
@@ -195,28 +205,47 @@
/*
* Returns the schema for the next level structure, in record, collection
* and map.
+ *
+ * @return Schema of the column
*/
public Schema getSchema() {
return schema;
}
}
- /*
- * helper class to parse a column name string one section at a time and find
+ /**
+ * Helper class to parse a column name string one section at a time and find
* the required type for the parsed part.
*/
public static class ParsedName {
- public String mName;
- int mKeyOffset; // the offset where the keysstring starts
- public ColumnType mDT = ColumnType.ANY; // parent's type
+ private String mName;
+ private int mKeyOffset; // the offset where the keysstring starts
+ private ColumnType mDT = ColumnType.ANY; // parent's type
+ /**
+ * Default ctor
+ */
public ParsedName() {
}
+ /**
+ * Set the name
+ *
+ * @param name
+ * column name string
+ */
public void setName(String name) {
mName = name;
}
+ /**
+ * Set the name and type
+ *
+ * @param name
+ * column name string
+ * @param pdt
+ * column type
+ */
public void setName(String name, ColumnType pdt) {
mName = name;
mDT = pdt;
@@ -227,21 +256,42 @@
mKeyOffset = keyStrOffset;
}
- void setDT(ColumnType dt) {
+ /**
+ * Set the column type
+ *
+ * @param dt
+ * column type to be set with
+ */
+ public void setDT(ColumnType dt) {
mDT = dt;
}
- ColumnType getDT() {
+ /**
+ * Get the column type
+ *
+ * @return column type
+ */
+ public ColumnType getDT() {
return mDT;
}
- String getName() {
+ /**
+ * Get the column name
+ *
+ * @return column name
+ */
+ public String getName() {
return mName;
}
+ /**
+ * Parse one sector of a fully qualified column name; also checks validity
+ * of use of the MAP and RECORD delimiters
+ *
+ * @param fs
+ * column schema this column name is checked against with
+ */
public String parseName(Schema.ColumnSchema fs) throws ParseException {
- // parse one sector of a fq name, also checks sanity of MAP and RECORD
- // fields
int fieldIndex, hashIndex;
fieldIndex = mName.indexOf('.');
hashIndex = mName.indexOf('#');
@@ -302,12 +352,13 @@
* alpha-numeric characters in column names.
*/
public Schema(String schema) throws ParseException {
- init(schema);
+ init(schema, false);
}
public Schema(String schema, boolean dupAllowed) throws ParseException {
dupColNameAllowed = dupAllowed;
- init(schema);
+ // suppose if duplicate is allowed, then it's from projection and hence virtual column is allowed
+ init(schema, dupAllowed);
}
public Schema(ColumnSchema fs) throws ParseException {
@@ -323,7 +374,7 @@
* please use only alpha-numeric characters in column names.
*/
public Schema(String[] columns) throws ParseException {
- init(columns);
+ init(columns, false);
}
/**
@@ -430,7 +481,7 @@
*/
public static Schema parse(String schema) throws ParseException {
Schema s = new Schema();
- s.init(schema);
+ s.init(schema, false);
return s;
}
@@ -609,15 +660,15 @@
*/
@Override
public void readFields(DataInput in) throws IOException {
- long version = org.apache.hadoop.io.file.tfile.Utils.readVLong(in);
+ long version = org.apache.hadoop.zebra.tfile.Utils.readVLong(in);
if (version > schemaVersion)
throw new IOException("Schema version is newer than that in software.");
// check-ups are needed for future versions for backward-compatibility
- String strSchema = org.apache.hadoop.io.file.tfile.Utils.readString(in);
+ String strSchema = org.apache.hadoop.zebra.tfile.Utils.readString(in);
try {
- init(strSchema);
+ init(strSchema, false);
}
catch (Exception e) {
throw new IOException(e.getMessage());
@@ -629,11 +680,11 @@
*/
@Override
public void write(DataOutput out) throws IOException {
- org.apache.hadoop.io.file.tfile.Utils.writeVLong(out, schemaVersion);
- org.apache.hadoop.io.file.tfile.Utils.writeString(out, toString());
+ org.apache.hadoop.zebra.tfile.Utils.writeVLong(out, schemaVersion);
+ org.apache.hadoop.zebra.tfile.Utils.writeString(out, toString());
}
- private void init(String[] columnNames) throws ParseException {
+ private void init(String[] columnNames, boolean virtualColAllowed) throws ParseException {
// the arg must be of type or they will be treated as the default type
mFields = new ArrayList<ColumnSchema>();
mNames = new HashMap<String, ColumnSchema>();
@@ -647,7 +698,7 @@
}
TableSchemaParser parser =
new TableSchemaParser(new StringReader(sb.toString()));
- parser.RecordSchema(this);
+ parser.RecordSchema(this, virtualColAllowed);
}
private void init() {
@@ -655,7 +706,7 @@
mNames = new HashMap<String, ColumnSchema>();
}
- private void init(String columnString) throws ParseException {
+ private void init(String columnString, boolean virtualColAllowed) throws ParseException {
String trimmedColumnStr;
if (columnString == null || (trimmedColumnStr = columnString.trim()).isEmpty()) {
init();
@@ -666,7 +717,7 @@
for (int nx = 0; nx < parts.length; nx++) {
parts[nx] = parts[nx].trim();
}
- init(parts);
+ init(parts, virtualColAllowed);
}
/**
@@ -683,6 +734,11 @@
ParsedName pn = new ParsedName();
HashSet<String> keyentries;
for (int i = 0; i < ncols; i++) {
+ if (Projection.isVirtualColumn(projcols[i]))
+ {
+ result.add(null);
+ continue;
+ }
pn.setName(projcols[i]);
if ((cs = getColumnSchemaOnParsedName(pn)) != null) {
mycs = new ColumnSchema(pn.mName, cs.schema, cs.type);
@@ -944,7 +1000,7 @@
else {
if (!ColumnSchema.equals(fs, otherfs))
throw new ParseException("Different types of column " + fs.name
- + " in uioned tables");
+ + " in tables of a union");
}
}
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
import java.io.*;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.types.Projection;
public class TableSchemaParser {
public static void main( String[] args )
@@ -39,6 +40,10 @@
objout.close();
System.out.println(schema.toString());
}
+ public Schema RecordSchema(Schema list) throws ParseException
+ {
+ return RecordSchema(list, false);
+ }
}
PARSER_END(TableSchemaParser)
@@ -288,7 +293,7 @@
{ return list; }
}
-Schema RecordSchema(Schema list) throws ParseException :
+Schema RecordSchema(Schema list, boolean virtualColAllowed) throws ParseException :
{
Schema item = null;
if (list == null)
@@ -298,8 +303,8 @@
{
(
(
- fs = ColumnSchema() {list.add(fs);}
- ( "," fs = ColumnSchema() {list.add(fs);})* <EOF>
+ fs = ColumnSchema() { if (!virtualColAllowed && fs != null && Projection.isVirtualColumn(fs.getName())) throw new ParseException("["+fs.getName()+"] is a reserved virtual column name"); list.add(fs);}
+ ( "," fs = ColumnSchema() { if (!virtualColAllowed && fs != null && Projection.isVirtualColumn(fs.getName())) throw new ParseException("["+fs.getName()+"] is a reserved virtual column name"); list.add(fs);})* <EOF>
)
)
{ return (list.getNumColumns() == 0 || (list.getNumColumns() == 1 && list.getColumn(0) == null) ? null : list); }
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Zebra Schema
+ */
+package org.apache.hadoop.zebra.schema;
+
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,979 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.zebra.tfile.CompareUtils.Scalar;
+import org.apache.hadoop.zebra.tfile.CompareUtils.ScalarComparator;
+import org.apache.hadoop.zebra.tfile.CompareUtils.ScalarLong;
+import org.apache.hadoop.zebra.tfile.Compression.Algorithm;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+
+/**
+ * Block Compressed file, the underlying physical storage layer for TFile.
+ * BCFile provides the basic block level compression for the data block and meta
+ * blocks. It is separated from TFile as it may be used for other
+ * block-compressed file implementation.
+ */
+final class BCFile {
+ // the current version of BCFile impl, increment them (major or minor) made
+ // enough changes
+ static final Version API_VERSION = new Version((short) 1, (short) 0);
+ static final Log LOG = LogFactory.getLog(BCFile.class);
+
+ /**
+ * Prevent the instantiation of BCFile objects.
+ */
+ private BCFile() {
+ // nothing
+ }
+
+ /**
+ * BCFile writer, the entry point for creating a new BCFile.
+ */
+ static public class Writer implements Closeable {
+ private final FSDataOutputStream out;
+ private final Configuration conf;
+ // the single meta block containing index of compressed data blocks
+ final DataIndex dataIndex;
+ // index for meta blocks
+ final MetaIndex metaIndex;
+ boolean blkInProgress = false;
+ private boolean metaBlkSeen = false;
+ private boolean closed = false;
+ long errorCount = 0;
+ // reusable buffers.
+ private BytesWritable fsOutputBuffer;
+
+ /**
+ * Call-back interface to register a block after a block is closed.
+ */
+ private static interface BlockRegister {
+ /**
+ * Register a block that is fully closed.
+ *
+ * @param raw
+ * The size of block in terms of uncompressed bytes.
+ * @param offsetStart
+ * The start offset of the block.
+ * @param offsetEnd
+ * One byte after the end of the block. Compressed block size is
+ * offsetEnd - offsetStart.
+ */
+ public void register(long raw, long offsetStart, long offsetEnd);
+ }
+
+ /**
+ * Intermediate class that maintain the state of a Writable Compression
+ * Block.
+ */
+ private static final class WBlockState {
+ private final Algorithm compressAlgo;
+ private Compressor compressor; // !null only if using native
+ // Hadoop compression
+ private final FSDataOutputStream fsOut;
+ private final long posStart;
+ private final SimpleBufferedOutputStream fsBufferedOutput;
+ private OutputStream out;
+
+ /**
+ * @param compressionAlgo
+ * The compression algorithm to be used to for compression.
+ * @throws IOException
+ */
+ public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut,
+ BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
+ this.compressAlgo = compressionAlgo;
+ this.fsOut = fsOut;
+ this.posStart = fsOut.getPos();
+
+ fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
+
+ this.fsBufferedOutput =
+ new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
+ this.compressor = compressAlgo.getCompressor();
+
+ try {
+ this.out =
+ compressionAlgo.createCompressionStream(fsBufferedOutput,
+ compressor, 0);
+ } catch (IOException e) {
+ compressAlgo.returnCompressor(compressor);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the output stream for BlockAppender's consumption.
+ *
+ * @return the output stream suitable for writing block data.
+ */
+ OutputStream getOutputStream() {
+ return out;
+ }
+
+ /**
+ * Get the current position in file.
+ *
+ * @return The current byte offset in underlying file.
+ * @throws IOException
+ */
+ long getCurrentPos() throws IOException {
+ return fsOut.getPos() + fsBufferedOutput.size();
+ }
+
+ long getStartPos() {
+ return posStart;
+ }
+
+ /**
+ * Current size of compressed data.
+ *
+ * @return
+ * @throws IOException
+ */
+ long getCompressedSize() throws IOException {
+ long ret = getCurrentPos() - posStart;
+ return ret;
+ }
+
+ /**
+ * Finishing up the current block.
+ */
+ public void finish() throws IOException {
+ try {
+ if (out != null) {
+ out.flush();
+ out = null;
+ }
+ } finally {
+ compressAlgo.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ /**
+ * Access point to stuff data into a block.
+ *
+ * TODO: Change DataOutputStream to something else that tracks the size as
+ * long instead of int. Currently, we will wrap around if the row block size
+ * is greater than 4GB.
+ */
+ public class BlockAppender extends DataOutputStream {
+ private final BlockRegister blockRegister;
+ private final WBlockState wBlkState;
+ @SuppressWarnings("hiding")
+ private boolean closed = false;
+
+ /**
+ * Constructor
+ *
+ * @param register
+ * the block register, which is called when the block is closed.
+ * @param wbs
+ * The writable compression block state.
+ */
+ BlockAppender(BlockRegister register, WBlockState wbs) {
+ super(wbs.getOutputStream());
+ this.blockRegister = register;
+ this.wBlkState = wbs;
+ }
+
+ /**
+ * Get the raw size of the block.
+ *
+ * @return the number of uncompressed bytes written through the
+ * BlockAppender so far.
+ * @throws IOException
+ */
+ public long getRawSize() throws IOException {
+ /**
+ * Expecting the size() of a block not exceeding 4GB. Assuming the
+ * size() will wrap to negative integer if it exceeds 2GB.
+ */
+ return size() & 0x00000000ffffffffL;
+ }
+
+ /**
+ * Get the compressed size of the block in progress.
+ *
+ * @return the number of compressed bytes written to the underlying FS
+ * file. The size may be smaller than actual need to compress the
+ * all data written due to internal buffering inside the
+ * compressor.
+ * @throws IOException
+ */
+ public long getCompressedSize() throws IOException {
+ return wBlkState.getCompressedSize();
+ }
+
+ @Override
+ public void flush() {
+ // The down stream is a special kind of stream that finishes a
+ // compression block upon flush. So we disable flush() here.
+ }
+
+ /**
+ * Signaling the end of write to the block. The block register will be
+ * called for registering the finished block.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+ try {
+ ++errorCount;
+ wBlkState.finish();
+ blockRegister.register(getRawSize(), wBlkState.getStartPos(),
+ wBlkState.getCurrentPos());
+ --errorCount;
+ } finally {
+ closed = true;
+ blkInProgress = false;
+ }
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param fout
+ * FS output stream.
+ * @param compressionName
+ * Name of the compression algorithm, which will be used for all
+ * data blocks.
+ * @throws IOException
+ * @see Compression#getSupportedAlgorithms
+ */
+ public Writer(FSDataOutputStream fout, String compressionName,
+ Configuration conf) throws IOException {
+ if (fout.getPos() != 0) {
+ throw new IOException("Output file not at zero offset.");
+ }
+
+ this.out = fout;
+ this.conf = conf;
+ dataIndex = new DataIndex(compressionName);
+ metaIndex = new MetaIndex();
+ fsOutputBuffer = new BytesWritable();
+ Magic.write(fout);
+ }
+
+ /**
+ * Close the BCFile Writer. Attempting to use the Writer after calling
+ * <code>close</code> is not allowed and may lead to undetermined results.
+ */
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+
+ try {
+ if (errorCount == 0) {
+ if (blkInProgress == true) {
+ throw new IllegalStateException(
+ "Close() called with active block appender.");
+ }
+
+ // add metaBCFileIndex to metaIndex as the last meta block
+ BlockAppender appender =
+ prepareMetaBlock(DataIndex.BLOCK_NAME,
+ getDefaultCompressionAlgorithm());
+ try {
+ dataIndex.write(appender);
+ } finally {
+ appender.close();
+ }
+
+ long offsetIndexMeta = out.getPos();
+ metaIndex.write(out);
+
+ // Meta Index and the trailing section are written out directly.
+ out.writeLong(offsetIndexMeta);
+
+ API_VERSION.write(out);
+ Magic.write(out);
+ out.flush();
+ }
+ } finally {
+ closed = true;
+ }
+ }
+
+ private Algorithm getDefaultCompressionAlgorithm() {
+ return dataIndex.getDefaultCompressionAlgorithm();
+ }
+
+ private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
+ throws IOException, MetaBlockAlreadyExists {
+ if (blkInProgress == true) {
+ throw new IllegalStateException(
+ "Cannot create Meta Block until previous block is closed.");
+ }
+
+ if (metaIndex.getMetaByName(name) != null) {
+ throw new MetaBlockAlreadyExists("name=" + name);
+ }
+
+ MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
+ WBlockState wbs =
+ new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
+ BlockAppender ba = new BlockAppender(mbr, wbs);
+ blkInProgress = true;
+ metaBlkSeen = true;
+ return ba;
+ }
+
+ /**
+ * Create a Meta Block and obtain an output stream for adding data into the
+ * block. There can only be one BlockAppender stream active at any time.
+ * Regular Blocks may not be created after the first Meta Blocks. The caller
+ * must call BlockAppender.close() to conclude the block creation.
+ *
+ * @param name
+ * The name of the Meta Block. The name must not conflict with
+ * existing Meta Blocks.
+ * @param compressionName
+ * The name of the compression algorithm to be used.
+ * @return The BlockAppender stream
+ * @throws IOException
+ * @throws MetaBlockAlreadyExists
+ * If the meta block with the name already exists.
+ */
+ public BlockAppender prepareMetaBlock(String name, String compressionName)
+ throws IOException, MetaBlockAlreadyExists {
+ return prepareMetaBlock(name, Compression
+ .getCompressionAlgorithmByName(compressionName));
+ }
+
+ /**
+ * Create a Meta Block and obtain an output stream for adding data into the
+ * block. The Meta Block will be compressed with the same compression
+ * algorithm as data blocks. There can only be one BlockAppender stream
+ * active at any time. Regular Blocks may not be created after the first
+ * Meta Blocks. The caller must call BlockAppender.close() to conclude the
+ * block creation.
+ *
+ * @param name
+ * The name of the Meta Block. The name must not conflict with
+ * existing Meta Blocks.
+ * @return The BlockAppender stream
+ * @throws MetaBlockAlreadyExists
+ * If the meta block with the name already exists.
+ * @throws IOException
+ */
+ public BlockAppender prepareMetaBlock(String name) throws IOException,
+ MetaBlockAlreadyExists {
+ return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
+ }
+
+ /**
+ * Create a Data Block and obtain an output stream for adding data into the
+ * block. There can only be one BlockAppender stream active at any time.
+ * Data Blocks may not be created after the first Meta Blocks. The caller
+ * must call BlockAppender.close() to conclude the block creation.
+ *
+ * @return The BlockAppender stream
+ * @throws IOException
+ */
+ public BlockAppender prepareDataBlock() throws IOException {
+ if (blkInProgress == true) {
+ throw new IllegalStateException(
+ "Cannot create Data Block until previous block is closed.");
+ }
+
+ if (metaBlkSeen == true) {
+ throw new IllegalStateException(
+ "Cannot create Data Block after Meta Blocks.");
+ }
+
+ DataBlockRegister dbr = new DataBlockRegister();
+
+ WBlockState wbs =
+ new WBlockState(getDefaultCompressionAlgorithm(), out,
+ fsOutputBuffer, conf);
+ BlockAppender ba = new BlockAppender(dbr, wbs);
+ blkInProgress = true;
+ return ba;
+ }
+
+ /**
+ * Callback to make sure a meta block is added to the internal list when its
+ * stream is closed.
+ */
+ private class MetaBlockRegister implements BlockRegister {
+ private final String name;
+ private final Algorithm compressAlgo;
+
+ MetaBlockRegister(String name, Algorithm compressAlgo) {
+ this.name = name;
+ this.compressAlgo = compressAlgo;
+ }
+
+ public void register(long raw, long begin, long end) {
+ metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
+ new BlockRegion(begin, end - begin, raw)));
+ }
+ }
+
+ /**
+ * Callback to make sure a data block is added to the internal list when
+ * it's being closed.
+ *
+ */
+ private class DataBlockRegister implements BlockRegister {
+ DataBlockRegister() {
+ // do nothing
+ }
+
+ public void register(long raw, long begin, long end) {
+ dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
+ }
+ }
+ }
+
+ /**
+ * BCFile Reader, interface to read the file's data and meta blocks.
+ */
+ static public class Reader implements Closeable {
+ private final FSDataInputStream in;
+ private final Configuration conf;
+ final DataIndex dataIndex;
+ // Index for meta blocks
+ final MetaIndex metaIndex;
+ final Version version;
+
+ /**
+ * Intermediate class that maintain the state of a Readable Compression
+ * Block.
+ */
+ static private final class RBlockState {
+ private final Algorithm compressAlgo;
+ private Decompressor decompressor;
+ private final BlockRegion region;
+ private final InputStream in;
+
+ public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin,
+ BlockRegion region, Configuration conf) throws IOException {
+ this.compressAlgo = compressionAlgo;
+ this.region = region;
+ this.decompressor = compressionAlgo.getDecompressor();
+
+ try {
+ this.in =
+ compressAlgo
+ .createDecompressionStream(new BoundedRangeFileInputStream(
+ fsin, this.region.getOffset(), this.region
+ .getCompressedSize()), decompressor, TFile
+ .getFSInputBufferSize(conf));
+ } catch (IOException e) {
+ compressAlgo.returnDecompressor(decompressor);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the output stream for BlockAppender's consumption.
+ *
+ * @return the output stream suitable for writing block data.
+ */
+ public InputStream getInputStream() {
+ return in;
+ }
+
+ public String getCompressionName() {
+ return compressAlgo.getName();
+ }
+
+ public BlockRegion getBlockRegion() {
+ return region;
+ }
+
+ public void finish() throws IOException {
+ try {
+ in.close();
+ } finally {
+ compressAlgo.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+
+ /**
+ * Access point to read a block.
+ */
+ public static class BlockReader extends DataInputStream {
+ private final RBlockState rBlkState;
+ private boolean closed = false;
+
+ BlockReader(RBlockState rbs) {
+ super(rbs.getInputStream());
+ rBlkState = rbs;
+ }
+
+ /**
+ * Finishing reading the block. Release all resources.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+ try {
+ // Do not set rBlkState to null. People may access stats after calling
+ // close().
+ rBlkState.finish();
+ } finally {
+ closed = true;
+ }
+ }
+
+ /**
+ * Get the name of the compression algorithm used to compress the block.
+ *
+ * @return name of the compression algorithm.
+ */
+ public String getCompressionName() {
+ return rBlkState.getCompressionName();
+ }
+
+ /**
+ * Get the uncompressed size of the block.
+ *
+ * @return uncompressed size of the block.
+ */
+ public long getRawSize() {
+ return rBlkState.getBlockRegion().getRawSize();
+ }
+
+ /**
+ * Get the compressed size of the block.
+ *
+ * @return compressed size of the block.
+ */
+ public long getCompressedSize() {
+ return rBlkState.getBlockRegion().getCompressedSize();
+ }
+
+ /**
+ * Get the starting position of the block in the file.
+ *
+ * @return the starting position of the block in the file.
+ */
+ public long getStartPos() {
+ return rBlkState.getBlockRegion().getOffset();
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param fin
+ * FS input stream.
+ * @param fileLength
+ * Length of the corresponding file
+ * @throws IOException
+ */
+ public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
+ throws IOException {
+ this.in = fin;
+ this.conf = conf;
+
+ // move the cursor to the beginning of the tail, containing: offset to the
+ // meta block index, version and magic
+ fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
+ / Byte.SIZE);
+ long offsetIndexMeta = fin.readLong();
+ version = new Version(fin);
+ Magic.readAndVerify(fin);
+
+ if (!version.compatibleWith(BCFile.API_VERSION)) {
+ throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+ }
+
+ // read meta index
+ fin.seek(offsetIndexMeta);
+ metaIndex = new MetaIndex(fin);
+
+ // read data:BCFile.index, the data block index
+ BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+ try {
+ dataIndex = new DataIndex(blockR);
+ } finally {
+ blockR.close();
+ }
+ }
+
+ /**
+ * Get the name of the default compression algorithm.
+ *
+ * @return the name of the default compression algorithm.
+ */
+ public String getDefaultCompressionName() {
+ return dataIndex.getDefaultCompressionAlgorithm().getName();
+ }
+
+ /**
+ * Get version of BCFile file being read.
+ *
+ * @return version of BCFile file being read.
+ */
+ public Version getBCFileVersion() {
+ return version;
+ }
+
+ /**
+ * Get version of BCFile API.
+ *
+ * @return version of BCFile API.
+ */
+ public Version getAPIVersion() {
+ return API_VERSION;
+ }
+
+ /**
+ * Finishing reading the BCFile. Release all resources.
+ */
+ public void close() {
+ // nothing to be done now
+ }
+
+ /**
+ * Get the number of data blocks.
+ *
+ * @return the number of data blocks.
+ */
+ public int getBlockCount() {
+ return dataIndex.getBlockRegionList().size();
+ }
+
+ /**
+ * Stream access to a Meta Block.
+ *
+ * @param name
+ * meta block name
+ * @return BlockReader input stream for reading the meta block.
+ * @throws IOException
+ * @throws MetaBlockDoesNotExist
+ * The Meta Block with the given name does not exist.
+ */
+ public BlockReader getMetaBlock(String name) throws IOException,
+ MetaBlockDoesNotExist {
+ MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+ if (imeBCIndex == null) {
+ throw new MetaBlockDoesNotExist("name=" + name);
+ }
+
+ BlockRegion region = imeBCIndex.getRegion();
+ return createReader(imeBCIndex.getCompressionAlgorithm(), region);
+ }
+
+ /**
+ * Stream access to a Data Block.
+ *
+ * @param blockIndex
+ * 0-based data block index.
+ * @return BlockReader input stream for reading the data block.
+ * @throws IOException
+ */
+ public BlockReader getDataBlock(int blockIndex) throws IOException {
+ if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+ throw new IndexOutOfBoundsException(String.format(
+ "blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+ }
+
+ BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
+ return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+ }
+
+ private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
+ throws IOException {
+ RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
+ return new BlockReader(rbs);
+ }
+
+ /**
+ * Find the smallest Block index whose starting offset is greater than or
+ * equal to the specified offset.
+ *
+ * @param offset
+ * User-specific offset.
+ * @return the index to the data Block if such block exists; or -1
+ * otherwise.
+ */
+ public int getBlockIndexNear(long offset) {
+ ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
+ int idx =
+ Utils
+ .lowerBound(list, new ScalarLong(offset), new ScalarComparator());
+
+ if (idx == list.size()) {
+ return -1;
+ }
+
+ return idx;
+ }
+ }
+
+ /**
+ * Index for all Meta blocks.
+ */
+ static class MetaIndex {
+ // use a tree map, for getting a meta block entry by name
+ final Map<String, MetaIndexEntry> index;
+
+ // for write
+ public MetaIndex() {
+ index = new TreeMap<String, MetaIndexEntry>();
+ }
+
+ // for read, construct the map from the file
+ public MetaIndex(DataInput in) throws IOException {
+ int count = Utils.readVInt(in);
+ index = new TreeMap<String, MetaIndexEntry>();
+
+ for (int nx = 0; nx < count; nx++) {
+ MetaIndexEntry indexEntry = new MetaIndexEntry(in);
+ index.put(indexEntry.getMetaName(), indexEntry);
+ }
+ }
+
+ public void addEntry(MetaIndexEntry indexEntry) {
+ index.put(indexEntry.getMetaName(), indexEntry);
+ }
+
+ public MetaIndexEntry getMetaByName(String name) {
+ return index.get(name);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, index.size());
+
+ for (MetaIndexEntry indexEntry : index.values()) {
+ indexEntry.write(out);
+ }
+ }
+ }
+
+ /**
+ * An entry describes a meta block in the MetaIndex.
+ */
+ static final class MetaIndexEntry {
+ private final String metaName;
+ private final Algorithm compressionAlgorithm;
+ private final static String defaultPrefix = "data:";
+
+ private final BlockRegion region;
+
+ public MetaIndexEntry(DataInput in) throws IOException {
+ String fullMetaName = Utils.readString(in);
+ if (fullMetaName.startsWith(defaultPrefix)) {
+ metaName =
+ fullMetaName.substring(defaultPrefix.length(), fullMetaName
+ .length());
+ } else {
+ throw new IOException("Corrupted Meta region Index");
+ }
+
+ compressionAlgorithm =
+ Compression.getCompressionAlgorithmByName(Utils.readString(in));
+ region = new BlockRegion(in);
+ }
+
+ public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm,
+ BlockRegion region) {
+ this.metaName = metaName;
+ this.compressionAlgorithm = compressionAlgorithm;
+ this.region = region;
+ }
+
+ public String getMetaName() {
+ return metaName;
+ }
+
+ public Algorithm getCompressionAlgorithm() {
+ return compressionAlgorithm;
+ }
+
+ public BlockRegion getRegion() {
+ return region;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, defaultPrefix + metaName);
+ Utils.writeString(out, compressionAlgorithm.getName());
+
+ region.write(out);
+ }
+ }
+
+ /**
+ * Index of all compressed data blocks.
+ */
+ static class DataIndex {
+ final static String BLOCK_NAME = "BCFile.index";
+
+ private final Algorithm defaultCompressionAlgorithm;
+
+ // for data blocks, each entry specifies a block's offset, compressed size
+ // and raw size
+ private final ArrayList<BlockRegion> listRegions;
+
+ // for read, deserialized from a file
+ public DataIndex(DataInput in) throws IOException {
+ defaultCompressionAlgorithm =
+ Compression.getCompressionAlgorithmByName(Utils.readString(in));
+
+ int n = Utils.readVInt(in);
+ listRegions = new ArrayList<BlockRegion>(n);
+
+ for (int i = 0; i < n; i++) {
+ BlockRegion region = new BlockRegion(in);
+ listRegions.add(region);
+ }
+ }
+
+ // for write
+ public DataIndex(String defaultCompressionAlgorithmName) {
+ this.defaultCompressionAlgorithm =
+ Compression
+ .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
+ listRegions = new ArrayList<BlockRegion>();
+ }
+
+ public Algorithm getDefaultCompressionAlgorithm() {
+ return defaultCompressionAlgorithm;
+ }
+
+ public ArrayList<BlockRegion> getBlockRegionList() {
+ return listRegions;
+ }
+
+ public void addBlockRegion(BlockRegion region) {
+ listRegions.add(region);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, defaultCompressionAlgorithm.getName());
+
+ Utils.writeVInt(out, listRegions.size());
+
+ for (BlockRegion region : listRegions) {
+ region.write(out);
+ }
+ }
+ }
+
+ /**
+ * Magic number uniquely identifying a BCFile in the header/footer.
+ */
+ static final class Magic {
+ private final static byte[] AB_MAGIC_BCFILE =
+ {
+ // ... total of 16 bytes
+ (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91,
+ (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf,
+ (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1,
+ (byte) 0x50 };
+
+ public static void readAndVerify(DataInput in) throws IOException {
+ byte[] abMagic = new byte[size()];
+ in.readFully(abMagic);
+
+ // check against AB_MAGIC_BCFILE, if not matching, throw an
+ // Exception
+ if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
+ throw new IOException("Not a valid BCFile.");
+ }
+ }
+
+ public static void write(DataOutput out) throws IOException {
+ out.write(AB_MAGIC_BCFILE);
+ }
+
+ public static int size() {
+ return AB_MAGIC_BCFILE.length;
+ }
+ }
+
+ /**
+ * Block region.
+ */
+ static final class BlockRegion implements Scalar {
+ private final long offset;
+ private final long compressedSize;
+ private final long rawSize;
+
+ public BlockRegion(DataInput in) throws IOException {
+ offset = Utils.readVLong(in);
+ compressedSize = Utils.readVLong(in);
+ rawSize = Utils.readVLong(in);
+ }
+
+ public BlockRegion(long offset, long compressedSize, long rawSize) {
+ this.offset = offset;
+ this.compressedSize = compressedSize;
+ this.rawSize = rawSize;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVLong(out, offset);
+ Utils.writeVLong(out, compressedSize);
+ Utils.writeVLong(out, rawSize);
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getCompressedSize() {
+ return compressedSize;
+ }
+
+ public long getRawSize() {
+ return rawSize;
+ }
+
+ @Override
+ public long magnitude() {
+ return offset;
+ }
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A byte array backed output stream with a limit. The limit should be smaller
+ * than the buffer capacity. The object can be reused through <code>reset</code>
+ * API and choose different limits in each round.
+ */
+public class BoundedByteArrayOutputStream extends OutputStream {
+ private final byte[] buffer;
+ private int limit;
+ private int count;
+
+ /**
+ * Create a BoundedByteArrayOutputStream with the specified
+ * capacity
+ * @param capacity The capacity of the underlying byte array
+ */
+ public BoundedByteArrayOutputStream(int capacity) {
+ this(capacity, capacity);
+ }
+
+ /**
+ * Create a BoundedByteArrayOutputStream with the specified
+ * capacity and limit.
+ * @param capacity The capacity of the underlying byte array
+ * @param limit The maximum limit upto which data can be written
+ */
+ public BoundedByteArrayOutputStream(int capacity, int limit) {
+ if ((capacity < limit) || (capacity | limit) < 0) {
+ throw new IllegalArgumentException("Invalid capacity/limit");
+ }
+ this.buffer = new byte[capacity];
+ this.limit = limit;
+ this.count = 0;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (count >= limit) {
+ throw new EOFException("Reaching the limit of the buffer.");
+ }
+ buffer[count++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+
+ if (count + len > limit) {
+ throw new EOFException("Reach the limit of the buffer");
+ }
+
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ }
+
+ /**
+ * Reset the limit
+ * @param newlim New Limit
+ */
+ public void reset(int newlim) {
+ if (newlim > buffer.length) {
+ throw new IndexOutOfBoundsException("Limit exceeds buffer size");
+ }
+ this.limit = newlim;
+ this.count = 0;
+ }
+
+ /** Reset the buffer */
+ public void reset() {
+ this.limit = buffer.length;
+ this.count = 0;
+ }
+
+ /** Return the current limit */
+ public int getLimit() {
+ return limit;
+ }
+
+ /** Returns the underlying buffer.
+ * Data is only valid to {@link #size()}.
+ */
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ /** Returns the length of the valid data
+ * currently in the buffer.
+ */
+ public int size() {
+ return count;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop
+ * FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they
+ * would not interfere with each other.
+ */
+class BoundedRangeFileInputStream extends InputStream {
+
+ private FSDataInputStream in;
+ private long pos;
+ private long end;
+ private long mark;
+ private final byte[] oneByte = new byte[1];
+
+ /**
+ * Constructor
+ *
+ * @param in
+ * The FSDataInputStream we connect to.
+ * @param offset
+ * Begining offset of the region.
+ * @param length
+ * Length of the region.
+ *
+ * The actual length of the region may be smaller if (off_begin +
+ * length) goes beyond the end of FS input stream.
+ */
+ public BoundedRangeFileInputStream(FSDataInputStream in, long offset,
+ long length) {
+ if (offset < 0 || length < 0) {
+ throw new IndexOutOfBoundsException("Invalid offset/length: " + offset
+ + "/" + length);
+ }
+
+ this.in = in;
+ this.pos = offset;
+ this.end = offset + length;
+ this.mark = -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ int avail = in.available();
+ if (pos + avail > end) {
+ avail = (int) (end - pos);
+ }
+
+ return avail;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int ret = read(oneByte);
+ if (ret == 1) return oneByte[0] & 0xff;
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+ if (n == 0) return -1;
+ int ret = 0;
+ synchronized (in) {
+ in.seek(pos);
+ ret = in.read(b, off, n);
+ }
+ if (ret < 0) {
+ end = pos;
+ return -1;
+ }
+ pos += ret;
+ return ret;
+ }
+
+ @Override
+ /*
+ * We may skip beyond the end of the file.
+ */
+ public long skip(long n) throws IOException {
+ long len = Math.min(n, end - pos);
+ pos += len;
+ return len;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ mark = pos;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (mark < 0) throw new IOException("Resetting to invalid mark");
+ pos = mark;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // Invalidate the state of the stream.
+ in = null;
+ pos = end;
+ mark = -1;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Adaptor class to wrap byte-array backed objects (including java byte array)
+ * as RawComparable objects.
+ */
+public final class ByteArray implements RawComparable {
+ private final byte[] buffer;
+ private final int offset;
+ private final int len;
+
+ /**
+ * Constructing a ByteArray from a {@link BytesWritable}.
+ *
+ * @param other
+ */
+ public ByteArray(BytesWritable other) {
+ this(other.getBytes(), 0, other.getLength());
+ }
+
+ /**
+ * Wrap a whole byte array as a RawComparable.
+ *
+ * @param buffer
+ * the byte array buffer.
+ */
+ public ByteArray(byte[] buffer) {
+ this(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Wrap a partial byte array as a RawComparable.
+ *
+ * @param buffer
+ * the byte array buffer.
+ * @param offset
+ * the starting offset
+ * @param len
+ * the length of the consecutive bytes to be wrapped.
+ */
+ public ByteArray(byte[] buffer, int offset, int len) {
+ if ((offset | len | (buffer.length - offset - len)) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ this.buffer = buffer;
+ this.offset = offset;
+ this.len = len;
+ }
+
+ /**
+ * @return the underlying buffer.
+ */
+ @Override
+ public byte[] buffer() {
+ return buffer;
+ }
+
+ /**
+ * @return the offset in the buffer.
+ */
+ @Override
+ public int offset() {
+ return offset;
+ }
+
+ /**
+ * @return the size of the byte array.
+ */
+ @Override
+ public int size() {
+ return len;
+ }
+}