You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC
svn commit: r833166 [1/5] - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...
Author: gates
Date: Thu Nov 5 21:02:57 2009
New Revision: 833166
URL: http://svn.apache.org/viewvc?rev=833166&view=rev
Log:
PIG-997 Sorted Table Support by Zebra.
Added:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSampleSortedTable.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSortedTableZebraKeyGenerator.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinInteger.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinMultipleColsSort.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorer.java
Removed:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/build-contrib.xml
hadoop/pig/trunk/contrib/zebra/build.xml
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMap.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMapOfRecord.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord3Map.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestUnionMixedTypes.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnGroupName.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnSecurity.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageCollection.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMap.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc1.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc2.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc3.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorePrimitive.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Nov 5 21:02:57 2009
@@ -5,6 +5,8 @@
INCOMPATIBLE CHANGES
IMPROVEMENTS
+
+ PIG-997 Sorted Table Support by Zebra (yanz via gates)
PIG-996 Add findbugs, checkstyle, and clover to zebra build file (chaow via
gates)
Modified: hadoop/pig/trunk/contrib/zebra/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build-contrib.xml?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build-contrib.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build-contrib.xml Thu Nov 5 21:02:57 2009
@@ -44,7 +44,7 @@
<property name="conf.dir" location="${pig.root}/conf"/>
<property name="test.junit.output.format" value="plain"/>
<property name="test.output" value="no"/>
- <property name="test.timeout" value="900000"/>
+ <property name="test.timeout" value="9000000"/>
<property name="build.dir" location="${pig.root}/build/contrib/${name}"/>
<property name="build.javadoc"
location="${pig.root}/build/contrib/${name}/docs"/>
@@ -53,6 +53,7 @@
<property name="build.examples" location="${build.dir}/examples"/>
<property name="pig.log.dir" location="${build.dir}/test/logs"/>
<property name="hadoop.jarfile" value="hadoop20.jar" />
+ <property name="pig.jarfile" value="pig-0.6.0-dev-core.jar" />
<property name="hbase.jarfile" value="hbase-0.18.1.jar" />
<property name="hbase.test.jarfile" value="hbase-0.18.1-test.jar" />
@@ -103,6 +104,9 @@
<include name="${hbase.jarfile}" />
<include name="${hbase.test.jarfile}" />
</fileset>
+ <fileset dir="${pig.root}/build">
+ <include name="${pig.jarfile}" />
+ </fileset>
<fileset dir="${pig.root}/build/ivy/lib">
<include name="**/*.jar"/>
</fileset>
Modified: hadoop/pig/trunk/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build.xml?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build.xml Thu Nov 5 21:02:57 2009
@@ -170,9 +170,16 @@
<classpath refid="test.classpath"/>
<formatter type="${test.junit.output.format}" />
+ <!-- For the time being, we disable two test cases that need a real cluster to run -->
<batchtest todir="${build.test}">
- <fileset dir="${src.test}"
- includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ <fileset dir="${src.test}" includes="**/Test*.java" excludes="**/TestCheckin*.java">
+ <not>
+ <filename name="**/TestRealCluster.java"/>
+ </not>
+ <not>
+ <filename name="**/TestColumnSecurity.java"/>
+ </not>
+ </fileset>
</batchtest>
</junit>
<fail if="tests.failed">Tests failed!</fail>
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Thu Nov 5 21:02:57 2009
@@ -56,6 +56,7 @@
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.parser.TableSchemaParser;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.pig.data.Tuple;
/**
@@ -82,13 +83,11 @@
private final static String BT_SCHEMA_FILE = ".btschema";
// schema version
private final static Version SCHEMA_VERSION =
- new Version((short) 1, (short) 0);
+ new Version((short) 1, (short) 1);
// name of the BasicTable meta-data file
private final static String BT_META_FILE = ".btmeta";
// column group prefix
private final static String CGPathPrefix = "CG";
- // default comparator to "memcmp"
- private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
private final static String DELETED_CG_PREFIX = ".deleted-";
@@ -288,7 +287,8 @@
schema = schemaFile.getLogical();
projection = new Projection(schema);
String storage = schemaFile.getStorageString();
- partition = new Partition(schema, projection, storage);
+ String comparator = schemaFile.getComparator();
+ partition = new Partition(schema, projection, storage, comparator);
for (int nx = 0; nx < numCGs; nx++) {
if (!schemaFile.isCGDeleted(nx)) {
colGroups[nx] =
@@ -349,6 +349,14 @@
}
/**
+ * @return the list of sorted columns
+ */
+ public SortInfo getSortInfo()
+ {
+ return schemaFile.getSortInfo();
+ }
+
+ /**
* Set the projection for the reader. This will affect calls to
* {@link #getScanner(RangeSplit, boolean)},
* {@link #getScanner(BytesWritable, BytesWritable, boolean)},
@@ -368,7 +376,7 @@
this.projection = new Projection(schemaFile.getLogical());
partition =
new Partition(schemaFile.getLogical(), this.projection, schemaFile
- .getStorageString());
+ .getStorageString(), schemaFile.getComparator());
}
else {
/**
@@ -379,7 +387,7 @@
new Projection(schemaFile.getLogical(), projection);
partition =
new Partition(schemaFile.getLogical(), this.projection, schemaFile
- .getStorageString());
+ .getStorageString(), schemaFile.getComparator());
}
inferredMapping = false;
}
@@ -432,7 +440,7 @@
kd.add(colGroups[nx].getKeyDistribution(n));
}
}
- if (kd.size() > (int) (n * 1.5)) {
+ if (n >= 0 && kd.size() > (int) (n * 1.5)) {
kd.resize(n);
}
return kd;
@@ -1031,6 +1039,8 @@
private boolean closed = true;
ColumnGroup.Writer[] colGroups;
Partition partition;
+ boolean sorted;
+ private boolean finished;
Tuple[] cgTuples;
/**
@@ -1056,35 +1066,34 @@
* implementation, the schema of a table is a comma or
* semicolon-separated list of column names, such as
* "FirstName, LastName; Sex, Department".
- * @param sorted
- * Whether the table to be created is sorted or not. If set to
- * true, we expect the rows inserted by every inserter created from
- * this Writer must be sorted. Additionally, there exists an
- * ordering of the inserters Ins-1, Ins-2, ... such that the rows
- * created by Ins-1, followed by rows created by Ins-2, ... form a
- * total order.
+ * @param sortColumns
+ * String of comma-separated sorted columns: null for unsorted tables
+ * @param comparator
+ * Name of the comparator used in sorted tables
* @param conf
* Optional Configuration objects.
*
* @throws IOException
* @see Schema
*/
- public Writer(Path path, String btSchemaString, String btStorageString,
- boolean sorted, Configuration conf) throws IOException {
+ public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
+ String comparator, Configuration conf) throws IOException {
try {
schemaFile =
- new SchemaFile(path, btSchemaString, btStorageString,
- DEFAULT_COMPARATOR, sorted, conf);
+ new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
+ comparator, conf);
partition = schemaFile.getPartition();
int numCGs = schemaFile.getNumOfPhysicalSchemas();
colGroups = new ColumnGroup.Writer[numCGs];
cgTuples = new Tuple[numCGs];
+ sorted = schemaFile.isSorted();
for (int nx = 0; nx < numCGs; nx++) {
colGroups[nx] =
new ColumnGroup.Writer(
new Path(path, schemaFile.getName(nx)),
schemaFile.getPhysicalSchema(nx),
sorted,
+ comparator,
schemaFile.getName(nx),
schemaFile.getSerializer(nx),
schemaFile.getCompressor(nx),
@@ -1130,7 +1139,16 @@
}
/**
- * Reopen an already created BasicTable for writing. Excepiton will be
+ * a wrapper to support backward compatible constructor
+ */
+ public Writer(Path path, String btSchemaString, String btStorageString,
+ Configuration conf) throws IOException {
+ this(path, btSchemaString, btStorageString, null, null, conf);
+ }
+
+ /**
+ /**
+ * Reopen an already created BasicTable for writing. Exception will be
* thrown if the table is already closed, or is in the process of being
* closed.
*/
@@ -1139,6 +1157,7 @@
schemaFile = new SchemaFile(path, conf);
int numCGs = schemaFile.getNumOfPhysicalSchemas();
partition = schemaFile.getPartition();
+ sorted = schemaFile.isSorted();
colGroups = new ColumnGroup.Writer[numCGs];
cgTuples = new Tuple[numCGs];
for (int nx = 0; nx < numCGs; nx++) {
@@ -1184,8 +1203,8 @@
* make the table immutable.
*/
public void finish() throws IOException {
- if (closed) return;
- closed = true;
+ if (finished) return;
+ finished = true;
try {
for (int nx = 0; nx < colGroups.length; nx++) {
if (colGroups[nx] != null) {
@@ -1220,6 +1239,8 @@
public void close() throws IOException {
if (closed) return;
closed = true;
+ if (!finished)
+ finish();
try {
for (int nx = 0; nx < colGroups.length; nx++) {
if (colGroups[nx] != null) {
@@ -1254,6 +1275,22 @@
public Schema getSchema() {
return schemaFile.getLogical();
}
+
+ /**
+ * @return sortness
+ */
+ public boolean isSorted() {
+ return sorted;
+ }
+
+ /**
+ * Get the list of sorted columns.
+ * @return the list of sorted columns
+ */
+ public SortInfo getSortInfo()
+ {
+ return schemaFile.getSortInfo();
+ }
/**
* Get a inserter with a given name.
@@ -1386,7 +1423,7 @@
}
if (finishWriter) {
try {
- BasicTable.Writer.this.close();
+ BasicTable.Writer.this.finish();
}
catch (Exception e) {
// no-op
@@ -1418,6 +1455,7 @@
Schema[] physical;
Partition partition;
boolean sorted;
+ SortInfo sortInfo = null;
String storage;
CGSchema[] cgschemas;
@@ -1436,17 +1474,21 @@
}
// ctor for writing
- public SchemaFile(Path path, String btSchemaStr, String btStorageStr,
- String btComparator, boolean sorted, Configuration conf)
+ public SchemaFile(Path path, String btSchemaStr, String btStorageStr, String sortColumns,
+ String btComparator, Configuration conf)
throws IOException {
storage = btStorageStr;
- this.comparator = btComparator;
try {
- partition = new Partition(btSchemaStr, btStorageStr);
+ partition = new Partition(btSchemaStr, btStorageStr, btComparator, sortColumns);
}
catch (Exception e) {
throw new IOException("Partition constructor failed :" + e.getMessage());
}
+ this.sortInfo = partition.getSortInfo();
+ this.sorted = partition.isSorted();
+ this.comparator = (this.sortInfo == null ? null : this.sortInfo.getComparator());
+ if (this.comparator == null)
+ this.comparator = "";
logical = partition.getSchema();
cgschemas = partition.getCGSchemas();
physical = new Schema[cgschemas.length];
@@ -1454,7 +1496,7 @@
physical[nx] = cgschemas[nx].getSchema();
}
cgDeletedFlags = new boolean[physical.length];
- this.sorted = sorted;
+
version = SCHEMA_VERSION;
// write out the schema
@@ -1473,6 +1515,10 @@
return sorted;
}
+ public SortInfo getSortInfo() {
+ return sortInfo;
+ }
+
public Schema getLogical() {
return logical;
}
@@ -1555,6 +1601,15 @@
WritableUtils.writeString(outSchema, physical[nx].toString());
}
WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+ WritableUtils.writeVInt(outSchema, sortInfo == null ? 0 : sortInfo.size());
+ if (sortInfo != null && sortInfo.size() > 0)
+ {
+ String[] sortedCols = sortInfo.getSortColumnNames();
+ for (int i = 0; i < sortInfo.size(); i++)
+ {
+ WritableUtils.writeString(outSchema, sortedCols[i]);
+ }
+ }
outSchema.close();
}
@@ -1583,7 +1638,7 @@
}
storage = WritableUtils.readString(in);
try {
- partition = new Partition(logicalStr, storage);
+ partition = new Partition(logicalStr, storage, comparator);
}
catch (Exception e) {
throw new IOException("Partition constructor failed :" + e.getMessage());
@@ -1606,6 +1661,19 @@
}
sorted = WritableUtils.readVInt(in) == 1 ? true : false;
setCGDeletedFlags(path, conf);
+ if (version.compareTo(new Version((short)1, (short)0)) > 0)
+ {
+ int numSortColumns = WritableUtils.readVInt(in);
+ if (numSortColumns > 0)
+ {
+ String[] sortColumnStr = new String[numSortColumns];
+ for (int i = 0; i < numSortColumns; i++)
+ {
+ sortColumnStr[i] = WritableUtils.readString(in);
+ }
+ sortInfo = SortInfo.parse(SortInfo.toSortString(sortColumnStr), logical, comparator);
+ }
+ }
in.close();
}
@@ -1677,10 +1745,25 @@
Path path = new Path(file);
try {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ String schemaStr = reader.getBTSchemaString();
+ String storageStr = reader.getStorageString();
IOutils.indent(out, indent);
- out.printf("Schema : %s\n", reader.getBTSchemaString());
+ out.printf("Schema : %s\n", schemaStr);
IOutils.indent(out, indent);
- out.printf("Storage Information : %s\n", reader.getStorageString());
+ out.printf("Storage Information : %s\n", storageStr);
+ SortInfo sortInfo = reader.getSortInfo();
+ if (sortInfo != null && sortInfo.size() > 0)
+ {
+ IOutils.indent(out, indent);
+ String[] sortedCols = sortInfo.getSortColumnNames();
+ out.println("Sorted Columns :");
+ for (int nx = 0; nx < sortedCols.length; nx++) {
+ if (nx > 0)
+ out.printf(" , ");
+ out.printf("%s", sortedCols[nx]);
+ }
+ out.printf("\n");
+ }
IOutils.indent(out, indent);
out.println("Column Groups within the Basic Table :");
for (int nx = 0; nx < reader.colGroups.length; nx++) {
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Thu Nov 5 21:02:57 2009
@@ -1051,7 +1051,7 @@
Configuration conf;
FileSystem fs;
CGSchema cgschema;
- private boolean finished;
+ private boolean finished, closed;
/**
* Create a ColumnGroup writer. The semantics are as follows:
@@ -1095,11 +1095,25 @@
public Writer(Path path, String schema, boolean sorted, String name, String serializer,
String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
throws IOException, ParseException {
- this(path, new Schema(schema), sorted, name, serializer, compressor, owner, group, perm, overwrite,
+ this(path, new Schema(schema), sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
conf);
}
public Writer(Path path, Schema schema, boolean sorted, String name, String serializer,
+ String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this(path, schema, sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
+ conf);
+ }
+
+ public Writer(Path path, String schema, boolean sorted, String comparator, String name, String serializer,
+ String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this(path, new Schema(schema), sorted, comparator, name, serializer, compressor, owner, group, perm, overwrite,
+ conf);
+ }
+
+ public Writer(Path path, Schema schema, boolean sorted, String comparator, String name, String serializer,
String compressor, String owner, String group, short perm, boolean overwrite, Configuration conf)
throws IOException, ParseException {
this.path = path;
@@ -1118,7 +1132,7 @@
checkPath(path, true);
- cgschema = new CGSchema(schema, sorted, name, serializer, compressor, owner, group, perm);
+ cgschema = new CGSchema(schema, sorted, comparator, name, serializer, compressor, owner, group, perm);
CGSchema sfNew = CGSchema.load(fs, path);
if (sfNew != null) {
// compare input with on-disk schema.
@@ -1162,7 +1176,10 @@
@Override
public void close() throws IOException {
if (!finished) {
- finished = true;
+ finish();
+ }
+ if (!closed) {
+ closed = true;
createIndex();
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Thu Nov 5 21:02:57 2009
@@ -21,7 +21,18 @@
import java.io.DataOutputStream;
import java.io.PrintStream;
+/**
+ * Helper for Zebra I/O
+ */
public class IOutils {
+ /**
+ * indent of some spaces
+ *
+ * @param os
+ * print stream the indent space to be inserted
+ * @param amount
+ * the number of spaces to be indented
+ */
public static void indent(PrintStream os, int amount)
{
for (int i = 0; i < amount; i++)
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Thu Nov 5 21:02:57 2009
@@ -24,6 +24,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.io.file.tfile.ByteArray;
/**
* Class used to convey the information of how on-disk data are distributed
@@ -149,7 +150,8 @@
* Get the block distribution of all data that maps to the key bucket.
*/
public BlockDistribution getBlockDistribution(BytesWritable key) {
- BlockDistribution bInfo = data.get(key);
+ ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+ BlockDistribution bInfo = data.get(key0);
if (bInfo == null) {
throw new IllegalArgumentException("Invalid key");
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Thu Nov 5 21:02:57 2009
@@ -93,7 +93,6 @@
/**
* Get the projection's schema
- * @return
*/
public Schema getSchema();
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java Thu Nov 5 21:02:57 2009
@@ -16,6 +16,6 @@
*/
/**
- * Physical I/O management of Hadoop Tables.
+ * Physical I/O management of Hadoop Zebra Tables.
*/
package org.apache.hadoop.zebra.io;
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Thu Nov 5 21:02:57 2009
@@ -30,8 +30,11 @@
import org.apache.hadoop.zebra.io.TableInserter;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
/**
* {@link org.apache.hadoop.mapred.OutputFormat} class for creating a
@@ -113,7 +116,10 @@
private static final String OUTPUT_SCHEMA = "mapred.lib.table.output.schema";
private static final String OUTPUT_STORAGEHINT =
"mapred.lib.table.output.storagehint";
- private static final String OUTPUT_SORTED = "mapred.lib.table.output.sorted";
+ private static final String OUTPUT_SORTCOLUMNS =
+ "mapred.lib.table.output.sortcolumns";
+ private static final String OUTPUT_COMPARATOR =
+ "mapred.lib.table.output.comparator";
/**
* Set the output path of the BasicTable in JobConf
@@ -174,6 +180,58 @@
return new Schema(schema);
}
+ private static KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
+ /**
+ * Generates a zebra specific sort key generator which is used to generate BytesWritable key
+ * Sort Key(s) are used to generate this object
+ *
+ * @param conf
+ * The JobConf object.
+ * @return Object of type zebra.pig.comaprator.KeyGenerator.
+ *
+ */
+ public static Object getSortKeyGenerator(JobConf conf) throws IOException, ParseException {
+
+ SortInfo sortInfo = getSortInfo(conf);
+ Schema schema = getSchema(conf);
+ String[] sortColNames = sortInfo.getSortColumnNames();
+
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ KeyGenerator builder = makeKeyBuilder(types);
+ return builder;
+
+ }
+
+
+ /**
+ * Generates a BytesWritable key for the input key
+ * using keygenerate provided. Sort Key(s) are used to generate this object
+ *
+ * @param builder
+ * Opaque key generator created by getSortKeyGenerator() method
+ * @param t
+ * Tuple to create sort key from
+ * @return ByteWritable Key
+ *
+ */
+ public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+ KeyGenerator kg = (KeyGenerator) builder;
+ return kg.generateKey(t);
+ }
+
+
+
+
/**
* Set the table storage hint in JobConf, should be called after setSchema is
* called.
@@ -194,7 +252,7 @@
throw new ParseException("Schema has not been set");
// for sanity check purpose only
- Partition partition = new Partition(schema, storehint);
+ Partition partition = new Partition(schema, storehint, null);
conf.set(OUTPUT_STORAGEHINT, storehint);
}
@@ -214,34 +272,84 @@
}
/**
- * Set sorted-ness of the table. It is disabled now (by making it package
- * private). So only unsorted BasicTables may be created for now.
- *
- * TODO: must also allow users to specify customized comparator.
+ * Set the sort info
+ *
+ * @param conf
+ * The JobConf object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
+ *
+ * @param comparator
+ * comparator class name; null for default
+ *
+ */
+ public static void setSortInfo(JobConf conf, String sortColumns, String comparator) {
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+ conf.set(OUTPUT_COMPARATOR, comparator);
+ }
+
+ /**
+ * Set the sort info
+ *
+ * @param conf
+ * The JobConf object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
*/
- public static void setSorted(JobConf conf, boolean sorted) {
- conf.setBoolean(OUTPUT_SORTED, sorted);
+ public static void setSortInfo(JobConf conf, String sortColumns) {
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+ }
+
+ /**
+ * Get the SortInfo object
+ *
+ * @param conf
+ * The JobConf object.
+ * @return SortInfo object; null if the Zebra table is unsorted
+ *
+ */
+ public static SortInfo getSortInfo(JobConf conf)throws IOException
+ {
+ String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+ if (sortColumns == null)
+ return null;
+ Schema schema = null;
+ try {
+ schema = getSchema(conf);
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failure : "+e.getMessage());
+ }
+ if (schema == null)
+ throw new IOException("Schema not defined");
+ String comparator = getComparator(conf);
+ return SortInfo.parse(sortColumns, schema, comparator);
}
/**
- * Is the table to be created should be sorted? It is disabled now (by making
- * it package private).
+ * Get the comparator for sort columns
+ *
+ * @param conf
+ * The JobConf object.
+ * @return comparator String
+ *
*/
- static boolean getSorted(JobConf conf) {
- return conf.getBoolean(OUTPUT_SORTED, false);
+ private static String getComparator(JobConf conf)
+ {
+ return conf.get(OUTPUT_COMPARATOR);
}
/**
* Get the output table as specified in JobConf. It is useful for applications
* to add more meta data after all rows have been added to the table.
- * Currently it is disabled (by setting it to package private).
*
* @param conf
* The JobConf object.
* @return The output BasicTable.Writer object.
* @throws IOException
*/
- public static BasicTable.Writer getOutput(JobConf conf) throws IOException {
+ private static BasicTable.Writer getOutput(JobConf conf) throws IOException {
String path = conf.get(OUTPUT_PATH);
if (path == null) {
throw new IllegalArgumentException("Cannot find output path");
@@ -268,16 +376,17 @@
if (schema == null) {
throw new IllegalArgumentException("Cannot find output schema");
}
- String storehint;
+ String storehint, sortColumns, comparator;
try {
storehint = getStorageHint(conf);
+ sortColumns = (getSortInfo(conf) == null ? null : SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
+ comparator = getComparator(conf);
}
catch (ParseException e) {
throw new IOException(e);
}
BasicTable.Writer writer =
- new BasicTable.Writer(new Path(path), schema, storehint,
- getSorted(conf), conf); // will
+ new BasicTable.Writer(new Path(path), schema, storehint, sortColumns, comparator, conf);
writer.finish();
}
@@ -299,14 +408,13 @@
/**
* Close the output BasicTable, No more rows can be added into the table. A
- * BasicTable is not visible for reading until it is "closed". This call is
- * required for sorted TFile, but not required for unsorted TFile.
+ * BasicTable is not visible for reading until it is "closed".
*
* @param conf
* The JobConf object.
* @throws IOException
*/
- static void close(JobConf conf) throws IOException {
+ public static void close(JobConf conf) throws IOException {
BasicTable.Writer table = getOutput(conf);
table.close();
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Thu Nov 5 21:02:57 2009
@@ -34,6 +34,7 @@
* Table Expression - expression to describe an input table.
*/
abstract class TableExpr {
+ private boolean sorted = false;
/**
* Factory method to create a TableExpr from a string.
*
@@ -179,7 +180,14 @@
* @return Whether this expression may only be split by key.
*/
public boolean sortedSplitRequired() {
- return false;
+ return sorted;
+ }
+
+ /**
+ * Set the requirement for sorted table
+ */
+ public void setSortedSplit() {
+ sorted = true;
}
/**
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Thu Nov 5 21:02:57 2009
@@ -24,7 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableUtils;
@@ -46,6 +46,8 @@
import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.mapred.TableExpr.LeafTableInfo;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -133,6 +135,7 @@
public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+ private static final String INPUT_SORT = "mapred.lib.table.input.sort";
/**
* Set the paths to the input table.
@@ -158,6 +161,45 @@
setInputExpr(conf, expr);
}
}
+
+ //This method escapes commas in the glob pattern of the given paths.
+ private static String[] getPathStrings(String commaSeparatedPaths) {
+ int length = commaSeparatedPaths.length();
+ int curlyOpen = 0;
+ int pathStart = 0;
+ boolean globPattern = false;
+ List<String> pathStrings = new ArrayList<String>();
+
+ for (int i=0; i<length; i++) {
+ char ch = commaSeparatedPaths.charAt(i);
+ switch(ch) {
+ case '{' : {
+ curlyOpen++;
+ if (!globPattern) {
+ globPattern = true;
+ }
+ break;
+ }
+ case '}' : {
+ curlyOpen--;
+ if (curlyOpen == 0 && globPattern) {
+ globPattern = false;
+ }
+ break;
+ }
+ case ',' : {
+ if (!globPattern) {
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+ pathStart = i + 1 ;
+ }
+ break;
+ }
+ }
+ }
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+
+ return pathStrings.toArray(new String[0]);
+ }
/**
* Set the input expression in the JobConf object.
@@ -181,6 +223,18 @@
StringReader in = new StringReader(expr);
return TableExpr.parse(in);
}
+
+ /**
+ * Get the schema of a table expr
+ *
+ * @param conf
+ * JobConf object.
+ */
+ public static Schema getSchema(JobConf conf) throws IOException
+ {
+ TableExpr expr = getInputExpr(conf);
+ return expr.getSchema(conf);
+ }
/**
* Set the input projection in the JobConf object.
@@ -220,15 +274,171 @@
}
/**
+ * Set requirement for sorted table
+ *
+ *@param conf
+ * JobConf object.
+ */
+ private static void setSorted(JobConf conf) {
+ conf.setBoolean(INPUT_SORT, true);
+ }
+
+ /**
+ * Get the SortInfo object regarding a Zebra table
+ *
+ * @param conf
+ * JobConf object
+ * @return the zebra tables's SortInfo; null if the table is unsorted.
+ */
+ public static SortInfo getSortInfo(JobConf conf) throws IOException
+ {
+ TableExpr expr = getInputExpr(conf);
+ SortInfo result = null;
+ int sortSize = 0;
+ if (expr instanceof BasicTableExpr)
+ {
+ BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ result = sortInfo;
+ } else {
+ List<LeafTableInfo> leaves = expr.getLeafTables(null);
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+ {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (sortSize == 0)
+ {
+ sortSize = sortInfo.size();
+ result = sortInfo;
+ } else if (sortSize != sortInfo.size()) {
+ throw new IOException("Tables of the table union do not possess the same sort property.");
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Requires sorted table or table union
+ *
+ * @param conf
+ * JobConf object.
+ * @param sortcolumns
+ * Sort column names.
+ * @param comparator
+ * Comparator name. Null means the caller does not care but table union will check
+ * identical comparators regardless as a minimum sanity check
+ *
+ */
+ public static void requireSortedTable(JobConf conf, String sortcolumns, String comparator) throws IOException {
+ TableExpr expr = getInputExpr(conf);
+ if (expr instanceof BasicTableExpr)
+ {
+ BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (comparator == null && sortInfo != null)
+ // cheat the equals method's comparator comparison
+ comparator = sortInfo.getComparator();
+ if (sortInfo == null || !sortInfo.equals(sortcolumns, comparator))
+ {
+ throw new IOException("The table is not (properly) sorted");
+ }
+ } else {
+ List<LeafTableInfo> leaves = expr.getLeafTables(null);
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+ {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (comparator == null && sortInfo != null)
+ comparator = sortInfo.getComparator(); // use the first table's comparator as comparison base
+ if (sortInfo == null || !sortInfo.equals(sortcolumns, comparator))
+ {
+ throw new IOException("The table is not (properly) sorted");
+ }
+ }
+ // need key range input splits for sorted table union
+ setSorted(conf);
+ }
+ }
+
+ /**
+ * Requires sorted table or table union. For table union, leading sort columns
+ * of component tables need to be the same
+ *
+ * @param conf
+ * JobConf object.
+ *
+ */
+ public static void requireSortedTable(JobConf conf) throws IOException {
+ TableExpr expr = getInputExpr(conf);
+ if (expr instanceof BasicTableExpr)
+ {
+ BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (sortInfo == null)
+ {
+ throw new IOException("The table is not (properly) sorted");
+ }
+ } else {
+ List<LeafTableInfo> leaves = expr.getLeafTables(null);
+ String sortcolumns = null, comparator = null;
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+ {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (sortInfo == null)
+ {
+ throw new IOException("The table is not (properly) sorted");
+ }
+ // check the compatible sort info among the member tables of a union
+ if (sortcolumns == null)
+ {
+ sortcolumns = SortInfo.toSortString(sortInfo.getSortColumnNames());
+ comparator = sortInfo.getComparator();
+ } else {
+ if (!sortInfo.equals(sortcolumns, comparator))
+ {
+ throw new IOException("The table is not (properly) sorted");
+ }
+ }
+ }
+ // need key range input splits for sorted table union
+ setSorted(conf);
+ }
+ }
+ /**
+ * Set requirement for sorted table
+ *
+ *@param conf
+ * JobConf object.
+ */
+ private static boolean getSorted(JobConf conf) {
+ return conf.getBoolean(INPUT_SORT, false);
+ }
+
+ /**
* @see InputFormat#getRecordReader(InputSplit, JobConf, Reporter)
*/
@Override
public RecordReader<BytesWritable, Tuple> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
- TableExpr expr = getInputExpr(conf);
+ TableExpr expr = getInputExpr(conf);
if (expr == null) {
throw new IOException("Table expression not defined");
}
+
+ if (getSorted(conf))
+ expr.setSortedSplit();
+
String strProj = conf.get(INPUT_PROJ);
String projection = null;
try {
@@ -241,16 +451,37 @@
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
}
+
try {
return new TableRecordReader(expr, projection, split, conf);
} catch (ParseException e) {
throw new IOException("Projection parsing faile : "+e.getMessage());
}
}
+
+ /**
+ * Get a TableRecordReader on a single split
+ *
+ * @param conf
+ * JobConf object.
+ * @param projection
+ * comma-separated column names in projection. null means all columns in projection
+ */
+
+ public TableRecordReader getTableRecordReader(JobConf conf, String projection) throws IOException, ParseException
+ {
+ // a single split is needed
+ if (projection != null)
+ setProjection(conf, projection);
+ TableInputFormat inputFormat = new TableInputFormat();
+ InputSplit[] splits = inputFormat.getSplits(conf, 1);
+ return (TableRecordReader) getRecordReader(splits[0], conf, Reporter.NULL);
+ }
private static InputSplit[] getSortedSplits(JobConf conf, int numSplits,
TableExpr expr, List<BasicTable.Reader> readers,
List<BasicTableStatus> status) throws IOException {
+
if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
throw new IOException("Unable to created sorted splits");
}
@@ -410,6 +641,8 @@
public InputSplit[] getSplits(JobConf conf, int numSplits)
throws IOException {
TableExpr expr = getInputExpr(conf);
+ if (getSorted(conf))
+ expr.setSortedSplit();
if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
throw new IOException("Unable to created sorted splits");
}
@@ -542,6 +775,12 @@
if (bool == 1) {
end.readFields(in);
}
+ length = WritableUtils.readVLong(in);
+ int size = WritableUtils.readVInt(in);
+ if (size > 0)
+ hosts = new String[size];
+ for (int i = 0; i < size; i++)
+ hosts[i] = WritableUtils.readString(in);
}
@Override
@@ -560,6 +799,12 @@
WritableUtils.writeVInt(out, 1);
end.write(out);
}
+ WritableUtils.writeVLong(out, length);
+ WritableUtils.writeVInt(out, hosts == null ? 0 : hosts.length);
+ for (int i = 0; i < hosts.length; i++)
+ {
+ WritableUtils.writeString(out, hosts[i]);
+ }
}
public BytesWritable getBegin() {
@@ -644,79 +889,3 @@
return split;
}
}
-
-/**
- * Adaptor class to implement RecordReader on top of Scanner.
- */
-class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
- private final TableScanner scanner;
- private long count = 0;
-
- /**
- *
- * @param expr
- * @param projection
- * projection schema. Should never be null.
- * @param split
- * @param conf
- * @throws IOException
- */
- public TableRecordReader(TableExpr expr, String projection,
- InputSplit split,
- JobConf conf) throws IOException, ParseException {
- if (expr.sortedSplitRequired()) {
- SortedTableSplit tblSplit = (SortedTableSplit) split;
- scanner =
- expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
- conf);
- }
- else {
- UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
- scanner = expr.getScanner(tblSplit, projection, conf);
- }
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- }
-
- @Override
- public BytesWritable createKey() {
- return new BytesWritable();
- }
-
- @Override
- public Tuple createValue() {
- try {
- return TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
- }
- catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return null;
- }
-
- @Override
- public long getPos() throws IOException {
- return count;
- }
-
- @Override
- public float getProgress() throws IOException {
- return (float) ((scanner.atEnd()) ? 1.0 : 0);
- }
-
- @Override
- public boolean next(BytesWritable key, Tuple value) throws IOException {
- if (scanner.atEnd()) {
- return false;
- }
- scanner.getKey(key);
- scanner.getValue(value);
- scanner.advance();
- ++count;
- return true;
- }
-}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Adaptor class to implement RecordReader on top of Scanner.
+ */
+public class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
+ private final TableScanner scanner;
+ private long count = 0;
+
+ /**
+ *
+ * @param expr
+ * Table expression
+ * @param projection
+ * projection schema. Should never be null.
+ * @param split
+ * the split to work on
+ * @param conf
+ * JobConf object
+ * @throws IOException
+ */
+ public TableRecordReader(TableExpr expr, String projection,
+ InputSplit split,
+ JobConf conf) throws IOException, ParseException {
+ if (expr.sortedSplitRequired()) {
+ SortedTableSplit tblSplit = (SortedTableSplit) split;
+ scanner =
+ expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
+ conf);
+ }
+ else {
+ UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
+ scanner = expr.getScanner(tblSplit, projection, conf);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public BytesWritable createKey() {
+ return new BytesWritable();
+ }
+
+ @Override
+ public Tuple createValue() {
+ try {
+ return TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return count;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return (float) ((scanner.atEnd()) ? 1.0 : 0);
+ }
+
+ @Override
+ public boolean next(BytesWritable key, Tuple value) throws IOException {
+ if (scanner.atEnd()) {
+ return false;
+ }
+ scanner.getKey(key);
+ scanner.getValue(value);
+ scanner.advance();
+ ++count;
+ return true;
+ }
+
+ /**
+ * Seek to the position at the first row which has the key
+ * or just after the key; only applicable for sorted Zebra table
+ *
+ * @param key
+ * the key to seek on
+ */
+ public boolean seekTo(BytesWritable key) throws IOException {
+ return scanner.seekTo(key);
+ }
+
+ /**
+ * Check if the end of the input has been reached
+ *
+ * @return true if the end of the input is reached
+ */
+ public boolean atEnd() throws IOException {
+ return scanner.atEnd();
+ }
+}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Thu Nov 5 21:02:57 2009
@@ -162,6 +162,7 @@
CachedTableScanner[] scanners;
PriorityBlockingQueue<CachedTableScanner> queue;
boolean synced = false;
+ CachedTableScanner scanner = null; // the working scanner
SortedTableUnionScanner(List<TableScanner> scanners) throws IOException {
if (scanners.isEmpty()) {
@@ -188,6 +189,9 @@
TableScanner scanner = scanners.get(i);
this.scanners[i] = new CachedTableScanner(scanner);
}
+ // initial fill-ins
+ if (!atEnd())
+ scanner = queue.poll();
}
@@ -206,21 +210,18 @@
@Override
public boolean advance() throws IOException {
sync();
- CachedTableScanner scanner = queue.poll();
- if (scanner != null) {
- scanner.advance();
- if (!scanner.atEnd()) {
- queue.add(scanner);
- }
- return true;
+ scanner.advance();
+ if (!scanner.atEnd()) {
+ queue.add(scanner);
}
- return false;
+ scanner = queue.poll();
+ return (scanner != null);
}
@Override
public boolean atEnd() throws IOException {
sync();
- return queue.isEmpty();
+ return (scanner == null && queue.isEmpty());
}
@Override
@@ -239,7 +240,6 @@
throw new EOFException("No more rows to read");
}
- CachedTableScanner scanner = queue.poll();
key.set(scanner.getKey());
}
@@ -249,7 +249,6 @@
throw new EOFException("No more rows to read");
}
- CachedTableScanner scanner = queue.poll();
row.reference(scanner.getValue());
}
@@ -260,6 +259,8 @@
rv = rv || scanner.seekTo(key);
}
synced = false;
+ if (!atEnd())
+ scanner = queue.poll();
return rv;
}
@@ -268,6 +269,7 @@
for (CachedTableScanner scanner : scanners) {
scanner.seekToEnd();
}
+ scanner = null;
synced = false;
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java Thu Nov 5 21:02:57 2009
@@ -18,7 +18,7 @@
/**
* Providing {@link org.apache.hadoop.mapred.InputFormat} and
* {@link org.apache.hadoop.mapred.OutputFormat} adaptor classes for Hadoop
- * Table.
+ * Zebra Table.
*/
package org.apache.hadoop.zebra.mapred;
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java Thu Nov 5 21:02:57 2009
@@ -32,7 +32,7 @@
* represent row keys, and PIG {@link org.apache.pig.data.Tuple} objects to
* represent rows.
* <p>
- * Each BasicTable maintains a {@link org.apache.hadoop.zebra.types.Schema} ,
+ * Each BasicTable maintains a {@link org.apache.hadoop.zebra.schema.Schema} ,
* which, for this release, is nothing but a collection of column names. Given a
* schema, we can deduce the integer index of a particular column, and use it to
* extract (get) the desired datum from PIG Tuple object (which only allows
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Thu Nov 5 21:02:57 2009
@@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-//import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,11 +40,14 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.mapred.TableRecordReader;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
@@ -57,16 +59,25 @@
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.hadoop.zebra.pig.comparator.*;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.hadoop.zebra.io.TableScanner;
/**
- * Pig LoadFunc and Slicer for Table
+ * Pig IndexableLoadFunc and Slicer for Zebra Table
*/
-public class TableLoader implements LoadFunc, Slicer {
+public class TableLoader implements IndexableLoadFunc, Slicer {
static final Log LOG = LogFactory.getLog(TableLoader.class);
private TableInputFormat inputFormat;
private JobConf jobConf;
private String projectionString;
private Path[] paths;
+ private TableRecordReader indexReader = null;
+ private BytesWritable indexKey = null;
+ private Tuple tuple;
+ private org.apache.hadoop.zebra.schema.Schema schema;
+ private SortInfo sortInfo;
+ private boolean sorted = false;
/**
* default constructor
@@ -84,12 +95,103 @@
projectionString = projectionStr;
}
+ /**
+ * @param projectionStr
+ * projection string passed from pig query.
+ * @param sorted
+ * need sorted table(s)?
+ */
+ public TableLoader(String projectionStr, String sorted) throws IOException {
+ inputFormat = new TableInputFormat();
+ if (projectionStr != null && !projectionStr.isEmpty())
+ projectionString = projectionStr;
+ if (sorted.equalsIgnoreCase("sorted"))
+ this.sorted = true;
+ else
+ throw new IOException("Invalid argument to the table loader constructor: "+sorted);
+ }
+
@Override
- public void bindTo(String fileName, BufferedPositionedInputStream is,
- long offset, long end) throws IOException {
- throw new IOException("Not implemented");
+ public void initialize(Configuration conf) throws IOException
+ {
+ if (conf == null)
+ throw new IOException("Null Configuration passed.");
+ jobConf = new JobConf(conf);
}
+
+ @Override
+ public void bindTo(String filePaths, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+
+ FileInputFormat.setInputPaths(jobConf, filePaths);
+ Path[] paths = FileInputFormat.getInputPaths(jobConf);
+ /**
+ * Performing glob pattern matching
+ */
+ List<Path> result = new ArrayList<Path>(paths.length);
+ for (Path p : paths) {
+ FileSystem fs = p.getFileSystem(jobConf);
+ FileStatus[] matches = fs.globStatus(p);
+ if (matches == null) {
+ LOG.warn("Input path does not exist: " + p);
+ }
+ else if (matches.length == 0) {
+ LOG.warn("Input Pattern " + p + " matches 0 files");
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ result.add(globStat.getPath());
+ } else {
+ LOG.warn("Input path " + p + " is not a directory");
+ }
+ }
+ }
+ }
+ if (result.isEmpty()) {
+ throw new IOException("No table specified for input");
+ }
+ TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
+ TableInputFormat.requireSortedTable(jobConf);
+ sortInfo = TableInputFormat.getSortInfo(jobConf);
+ schema = TableInputFormat.getSchema(jobConf);
+ int numcols = schema.getNumColumns();
+ tuple = TypesUtils.createTuple(numcols);
+
+ /*
+ * Use all columns of a table as a projection: not an optimal approach
+ * No need to call TableInputFormat.setProjection: by default use all columns
+ */
+ try {
+ indexReader = inputFormat.getTableRecordReader(jobConf, null);
+ } catch (ParseException e) {
+ throw new IOException("Exception from TableInputFormat.getTableRecordReader: "+e.getMessage());
+ }
+ indexKey = new BytesWritable();
+ }
+
+ @Override
+ public void seekNear(Tuple t) throws IOException
+ {
+ // SortInfo sortInfo = inputFormat.getSortInfo(conf, path);
+ String[] sortColNames = sortInfo.getSortColumnNames();
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ KeyGenerator builder = makeKeyBuilder(types);
+ BytesWritable key = builder.generateKey(t);
+// BytesWritable key = new BytesWritable(((String) t.get(0)).getBytes());
+ indexReader.seekTo(key);
+ }
+
+ private KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
/**
* @param storage
* @param location
@@ -145,6 +247,8 @@
} catch (ParseException e) {
throw new RuntimeException("Schema parsing failed : "+e.getMessage());
}
+ if (sorted)
+ TableInputFormat.requireSortedTable(jobConf);
}
}
@@ -215,17 +319,27 @@
@Override
public Tuple getNext() throws IOException {
- throw new IOException("Not implemented");
+ if (indexReader.atEnd())
+ return null;
+ indexReader.next(indexKey, tuple);
+ return tuple;
}
+ @Override
+ public void close() throws IOException {
+ if (indexReader != null)
+ indexReader.close();
+ }
+
@Override
public Slice[] slice(DataStorage store, String location) throws IOException {
checkConf(store, location);
// TableInputFormat accepts numSplits < 0 (special case for no-hint)
InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+
Slice[] slices = new Slice[splits.length];
for (int nx = 0; nx < slices.length; nx++) {
- slices[nx] = new TableSlice(jobConf, splits[nx]);
+ slices[nx] = new TableSlice(jobConf, splits[nx], sorted);
}
return slices;
@@ -244,11 +358,12 @@
private InputSplit split;
transient private JobConf conf;
- transient private String projection;
+ transient private int numProjCols = 0;
transient private RecordReader<BytesWritable, Tuple> scanner;
transient private BytesWritable key;
+ transient private boolean sorted = false;
- TableSlice(JobConf conf, InputSplit split) {
+ TableSlice(JobConf conf, InputSplit split, boolean sorted) {
// hack: expecting JobConf contains nothing but a <string, string>
// key-value pair store.
configMap = new TreeMap<String, String>();
@@ -257,6 +372,7 @@
configMap.put(e.getKey(), e.getValue());
}
this.split = split;
+ this.sorted = sorted;
}
@Override
@@ -315,20 +431,24 @@
localConf.set(e.getKey(), e.getValue());
}
conf = new JobConf(localConf);
+ String projection;
try
- {
- projection = TableInputFormat.getProjection(conf);
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed :"+e.getMessage());
- }
+ {
+ projection = TableInputFormat.getProjection(conf);
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failed :"+e.getMessage());
+ }
+ numProjCols = Projection.getNumColumns(projection);
TableInputFormat inputFormat = new TableInputFormat();
+ if (sorted)
+ TableInputFormat.requireSortedTable(conf);
scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
key = new BytesWritable();
}
@Override
public boolean next(Tuple value) throws IOException {
- TypesUtils.formatTuple(value, projection);
+ TypesUtils.formatTuple(value, numProjCols);
return scanner.next(key, value);
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Thu Nov 5 21:02:57 2009
@@ -21,25 +21,37 @@
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
+import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
+import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
+import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableInserter;
import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.StoreConfig;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.CommittableStoreFunc;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
-public class TableStorer implements StoreFunc {
+/**
+ * Pig CommittableStoreFunc for Zebra Table
+ */
+public class TableStorer implements CommittableStoreFunc {
private String storageHintString;
public TableStorer() {
@@ -78,6 +90,18 @@
static public void main(String[] args) throws SecurityException, NoSuchMethodException {
Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
}
+
+ @Override
+ public void commit(Configuration conf) throws IOException {
+ try {
+ JobConf job = new JobConf(conf);
+ StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
+ BasicTable.Writer write = new BasicTable.Writer(new Path(storeConfig.getLocation()), job);
+ write.close();
+ } catch (IOException ee) {
+ throw ee;
+ }
+ }
}
/**
@@ -91,6 +115,36 @@
StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
String location = storeConfig.getLocation(), schemaStr;
Schema schema = storeConfig.getSchema();
+ org.apache.pig.SortInfo pigSortInfo = storeConfig.getSortInfo();
+
+ /* TODO
+ * use a home-brewn comparator ??
+ */
+ String comparator = null;
+ String sortColumnNames = null;
+ if (pigSortInfo != null)
+ {
+ List<org.apache.pig.SortColInfo> sortColumns = pigSortInfo.getSortColInfoList();
+ StringBuilder sb = new StringBuilder();
+ if (sortColumns != null && sortColumns.size() >0)
+ {
+ org.apache.pig.SortColInfo sortColumn;
+ String sortColumnName;
+ for (int i = 0; i < sortColumns.size(); i++)
+ {
+ sortColumn = sortColumns.get(i);
+ sortColumnName = sortColumn.getColName();
+ if (sortColumnName == null)
+ throw new IOException("Zebra does not support column positional reference yet");
+ if (!org.apache.pig.data.DataType.isAtomic(schema.getField(sortColumnName).type))
+ throw new IOException(schema.getField(sortColumnName).alias+" is not of simple type as required for a sort column now.");
+ if (i > 0)
+ sb.append(",");
+ sb.append(sortColumnName);
+ }
+ sortColumnNames = sb.toString();
+ }
+ }
try {
schemaStr = SchemaConverter.fromPigSchema(schema).toString();
} catch (ParseException e) {
@@ -98,7 +152,7 @@
}
TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);
BasicTable.Writer writer = new BasicTable.Writer(new Path(location),
- schemaStr, storeFunc.getStorageHintString(), false, job);
+ schemaStr, storeFunc.getStorageHintString(), sortColumnNames, comparator, job);
writer.finish();
}
@@ -118,6 +172,9 @@
final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
private BasicTable.Writer writer;
private TableInserter inserter;
+ private int[] sortColIndices = null;
+ KeyGenerator builder;
+ Tuple t;
public TableRecordWriter(String name, JobConf conf) throws IOException {
StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
@@ -126,6 +183,22 @@
// TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
// compression, etc.
writer = new BasicTable.Writer(new Path(location), conf);
+
+ if (writer.getSortInfo() != null)
+ {
+ sortColIndices = writer.getSortInfo().getSortIndices();
+ SortInfo sortInfo = writer.getSortInfo();
+ String[] sortColNames = sortInfo.getSortColumnNames();
+ org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ t = TypesUtils.createTuple(sortColNames.length);
+ builder = makeKeyBuilder(types);
+ }
+
inserter = writer.getInserter(name, false);
}
@@ -135,9 +208,23 @@
writer.finish();
}
+ private KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
@Override
public void write(BytesWritable key, Tuple value) throws IOException {
- if (key == null) {
+ if (sortColIndices != null)
+ {
+ for(int i =0; i < sortColIndices.length;++i) {
+ t.set(i, value.get(sortColIndices[i]));
+ }
+ key = builder.generateKey(t);
+ } else if (key == null) {
key = KEY0;
}
inserter.insert(key, value);
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class BagExpr extends LeafExpr {
+ protected List<LeafGenerator> list;
+ protected final ComparatorExpr expr;
+ protected int el, cel;
+ protected boolean c;
+
+ public BagExpr(int index, ComparatorExpr expr) {
+ super(index);
+ this.expr = expr;
+ }
+
+ @Override
+ protected
+ void appendObject(EncodingOutputStream out, Object object)
+ throws ExecException {
+
+ if (list == null) {
+ // This is the first time we get called. build the execution plan.
+ el = out.getEscapeLevel();
+ cel = out.getComescLevel();
+ c = out.getComplement();
+ list = new ArrayList<LeafGenerator>();
+ // requiring the individual items to be explicitly bounded.
+ expr.appendLeafGenerator(list, el, cel, c, true);
+ }
+
+ DataBag bag = (DataBag) object;
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ for (Iterator<LeafGenerator> it2 = list.iterator(); it2.hasNext();) {
+ LeafGenerator g = it2.next();
+ g.append(out, t);
+ }
+ }
+ }
+
+ @Override
+ protected
+ boolean implicitBound() {
+ return true;
+ }
+
+ public void illustrate(PrintStream out, int escapeLevel, int comescLevel,
+ boolean complement) {
+ List<LeafGenerator> l = new ArrayList<LeafGenerator>();
+ expr.appendLeafGenerator(l, escapeLevel, comescLevel, complement, true);
+ out.printf("(%s, %d, [", getType(), index);
+ for (Iterator<LeafGenerator> it = l.iterator(); it.hasNext();) {
+ LeafGenerator leaf = it.next();
+ leaf.illustrate(out);
+ }
+ out.print("])");
+ }
+
+ @Override
+ protected
+ String getType() {
+ return "Bag";
+ }
+
+ @Override
+ protected
+ void toString(PrintStream out) {
+ out.printf("%s(%d, ", getType(), index);
+ expr.toString(out);
+ out.print(")");
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+public final class BooleanExpr extends FixedLengthPrimitive {
+ public BooleanExpr(int index) {
+ super(index, 1);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != 1) {
+ throw new IllegalArgumentException("expecting array length == 1");
+ }
+ boolean l = (Boolean) o;
+ // Avoid using 0, 1, which may require escaping later.
+ b[0] = (byte) (l ? 3 : 2);
+ }
+
+ @Override
+ protected String getType() {
+ return "Boolean";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+public final class ByteExpr extends FixedLengthPrimitive {
+ public ByteExpr(int index) {
+ super(index, 1);
+ }
+
+ protected void convertValue(byte[] b, Object o) {
+ if (b.length != 1) {
+ throw new IllegalArgumentException("Expecting length==1");
+ }
+ byte l = (Byte) o;
+ l ^= 1 << (Byte.SIZE - 1);
+ b[0] = l;
+ }
+
+ @Override
+ protected String getType() {
+ return "Byte";
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import org.apache.pig.data.DataByteArray;
+
+public final class BytesExpr extends LeafExpr {
+ public BytesExpr(int index) {
+ super(index);
+ }
+
+ @Override
+ protected void appendObject(EncodingOutputStream out, Object o) {
+ DataByteArray bytes = (DataByteArray) o;
+ out.write(bytes.get());
+ }
+
+ @Override
+ protected boolean implicitBound() {
+ return true;
+ }
+
+ @Override
+ protected String getType() {
+ return "ByteArray";
+ }
+}