You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/21 16:17:55 UTC
svn commit: r828027 - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/pig/ src/test/org/apache/hadoop/zebra/pig/
Author: gates
Date: Wed Oct 21 14:17:54 2009
New Revision: 828027
URL: http://svn.apache.org/viewvc?rev=828027&view=rev
Log:
PIG-944 Change schema to be taken from StoreConfig instead of TableStorer's constructor.
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Wed Oct 21 14:17:54 2009
@@ -15,6 +15,9 @@
BUG FIXES
+ PIG-944 Change schema to be taken from StoreConfig instead of
+ TableStorer's constructor (yanz via gates).
+
PIG-918. Fix infinite loop only columns in first column group are
specified. (Yan Zhou via rangadi)
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Wed Oct 21 14:17:54 2009
@@ -21,72 +21,52 @@
import java.util.Iterator;
import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.ColumnType;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-/**
- * A simple schema converter that only understands three field types.
- */
class SchemaConverter {
- enum FieldSchemaMaker {
- SimpleField("SF_") {
- @Override
- FieldSchema toFieldSchema(String name) {
- return new FieldSchema(name, DataType.BYTEARRAY);
- }
- },
-
- MapField("MF_") {
- @Override
- FieldSchema toFieldSchema(String name) {
- // TODO: how to convey key and value types?
- return new FieldSchema(name, DataType.MAP);
- }
- },
-
- MapListField("MLF_") {
- @Override
- FieldSchema toFieldSchema(String name) throws FrontendException {
- Schema tupleSchema = new Schema();
- tupleSchema.add(MapField.toFieldSchema(null));
- tupleSchema.setTwoLevelAccessRequired(true);
- return new FieldSchema(name, tupleSchema, DataType.BAG);
- }
- };
-
- private String prefix;
-
- FieldSchemaMaker(String prefix) {
- this.prefix = prefix;
- }
-
- abstract FieldSchema toFieldSchema(String name) throws FrontendException;
-
- public static FieldSchema makeFieldSchema(String colname)
- throws FrontendException {
- for (FieldSchemaMaker e : FieldSchemaMaker.values()) {
- if (colname.startsWith(e.prefix)) {
- return e.toFieldSchema(colname.substring(e.prefix.length()));
- }
- }
- throw new FrontendException("Cannot determine type from column name");
- }
-
- public static String makeColumnName(FieldSchema fs)
- throws FrontendException {
- if (fs.alias == null) {
- throw new FrontendException("No alias provided for field schema");
- }
- for (FieldSchemaMaker e : FieldSchemaMaker.values()) {
- FieldSchema expected = e.toFieldSchema("dummy");
- if (FieldSchema.equals(fs, expected, false, true)) {
- return e.prefix + fs.alias;
- }
- }
- throw new FrontendException("Unsupported field schema");
+ public static ColumnType toTableType(byte ptype)
+ {
+ ColumnType ret;
+ switch (ptype) {
+ case DataType.INTEGER:
+ ret = ColumnType.INT;
+ break;
+ case DataType.LONG:
+ ret = ColumnType.LONG;
+ break;
+ case DataType.FLOAT:
+ ret = ColumnType.FLOAT;
+ break;
+ case DataType.DOUBLE:
+ ret = ColumnType.DOUBLE;
+ break;
+ case DataType.BOOLEAN:
+ ret = ColumnType.BOOL;
+ break;
+ case DataType.BAG:
+ ret = ColumnType.COLLECTION;
+ break;
+ case DataType.MAP:
+ ret = ColumnType.MAP;
+ break;
+ case DataType.TUPLE:
+ ret = ColumnType.RECORD;
+ break;
+ case DataType.CHARARRAY:
+ ret = ColumnType.STRING;
+ break;
+ case DataType.BYTEARRAY:
+ ret = ColumnType.BYTES;
+ break;
+ default:
+ ret = null;
+ break;
}
+ return ret;
}
public static Schema toPigSchema(
@@ -97,24 +77,39 @@
org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema =
tschema.getColumn(col);
if (columnSchema != null) {
- ret.add(new FieldSchema(col, columnSchema.getType().pigDataType()));
+ ColumnType ct = columnSchema.getType();
+ if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
+ ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
+ ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
+ else
+ ret.add(new FieldSchema(col, ct.pigDataType()));
} else {
ret.add(new FieldSchema(null, null));
}
}
-
return ret;
}
public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
Schema pschema) throws FrontendException, ParseException {
- String[] colnames = new String[pschema.size()];
- int i = 0;
- for (Iterator<FieldSchema> it = pschema.getFields().iterator(); it
- .hasNext(); ++i) {
- FieldSchema fs = it.next();
- colnames[i] = FieldSchemaMaker.makeColumnName(fs);
+ org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
+ Schema.FieldSchema columnSchema;
+ for (int i = 0; i < pschema.size(); i++) {
+ columnSchema = pschema.getField(i);
+ if (columnSchema != null) {
+ if (DataType.isSchemaType(columnSchema.type))
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
+ fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
+ else if (columnSchema.type == DataType.MAP)
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
+ new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null,
+ org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
+ else
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
+ } else {
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
+ }
}
- return new org.apache.hadoop.zebra.schema.Schema(colnames);
+ return tschema;
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Wed Oct 21 14:17:54 2009
@@ -35,18 +35,17 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
public class TableStorer implements StoreFunc {
- private String schemaString;
private String storageHintString;
public TableStorer() {
}
- public TableStorer(String schemaStr, String storageHintStr) throws ParseException, IOException {
- schemaString = schemaStr;
+ public TableStorer(String storageHintStr) throws ParseException, IOException {
storageHintString = storageHintStr;
}
@@ -69,10 +68,6 @@
public Class getStorePreparationClass() throws IOException {
return TableOutputFormat.class;
}
-
- public String getSchemaString() {
- return schemaString;
- }
public String getStorageHintString() {
return storageHintString;
@@ -94,10 +89,16 @@
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
- String location = storeConfig.getLocation();
+ String location = storeConfig.getLocation(), schemaStr;
+ Schema schema = storeConfig.getSchema();
+ try {
+ schemaStr = SchemaConverter.fromPigSchema(schema).toString();
+ } catch (ParseException e) {
+ throw new IOException("Exception thrown from SchemaConverter: " + e.getMessage());
+ }
TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);
BasicTable.Writer writer = new BasicTable.Writer(new Path(location),
- storeFunc.getSchemaString(), storeFunc.getStorageHintString(), false, job);
+ schemaStr, storeFunc.getStorageHintString(), false, job);
writer.finish();
}
@@ -134,11 +135,11 @@
writer.finish();
}
- @Override
- public void write(BytesWritable key, Tuple value) throws IOException {
- if (key == null) {
- key = KEY0;
- }
- inserter.insert(key, value);
- }
+ @Override
+ public void write(BytesWritable key, Tuple value) throws IOException {
+ if (key == null) {
+ key = KEY0;
+ }
+ inserter.insert(key, value);
+ }
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java Wed Oct 21 14:17:54 2009
@@ -143,6 +143,6 @@
pigServer.store("records", new Path(pathTable, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('c:collection(a:double, b:float, c:bytes)', '[c]')");
+ + "('[c]')");
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java Wed Oct 21 14:17:54 2009
@@ -132,6 +132,6 @@
pigServer
.store("records", new Path(pathTable, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('m:map(string)', '[m#{a|b}]')");
+ + "('[m#{a|b}]')");
}
}
\ No newline at end of file
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java Wed Oct 21 14:17:54 2009
@@ -75,7 +75,7 @@
/*
* pigServer.store("records", new Path(pathTable, "store").toString(),
* TableStorer.class.getCanonicalName() +
- * "('SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g', '[SF_a, SF_b, SF_c]; [SF_e]')" );
+ * "('[SF_a, SF_b, SF_c]; [SF_e]')" );
*/
}
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Wed Oct 21 14:17:54 2009
@@ -265,7 +265,7 @@
"records",
new Path(newPath, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+ + "('[s1, s2]; [s3, s4]')");
}
@@ -290,7 +290,7 @@
"newRecord",
newPath.toString(),
TableStorer.class.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+ + "('[s1, s2]; [s3, s4]')");
// check new table content
String query3 = "newRecords = LOAD '"
@@ -340,7 +340,7 @@
Path newPath = new Path(getCurrentMethodName());
pigServer.store("newRecord", newPath.toString(), TableStorer.class
.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '')");
+ + "('')");
// check new table content
String query3 = "newRecords = LOAD '"
@@ -390,7 +390,7 @@
Path newPath = new Path(getCurrentMethodName());
pigServer.store("newRecord", newPath.toString(), TableStorer.class
.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[]')");
+ + "('[]')");
// check new table content
String query3 = "newRecords = LOAD '"
@@ -443,7 +443,7 @@
"records",
new Path(newPath, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+ + "('[s7, s2]; [s3, s4]')");
Assert.assertNotNull(pigJob.getException());
System.out.println(pigJob.getException());
}
@@ -469,7 +469,7 @@
"records",
new Path(newPath, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s1, s4]')");
+ + "('[s1, s2]; [s1, s4]')");
Assert.assertNotNull(pigJob.getException());
System.out.println(pigJob.getException());
}
@@ -495,7 +495,7 @@
"records",
new Path(newPath, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1]; [s1]')");
+ + "('[s1]; [s1]')");
Assert.assertNotNull(pigJob.getException());
System.out.println(pigJob.getException());
}
@@ -521,7 +521,7 @@
"records",
new Path(newPath, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('s1:int, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+ + "('[s1, s2]; [s3, s4]')");
Assert.assertNotNull(pigJob.getException());
System.out.println(pigJob.getException());
}
@@ -546,7 +546,7 @@
"records",
path.toString(),
TableStorer.class.getCanonicalName()
- + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+ + "('[s1, s2]; [s3, s4]')");
Assert.assertNotNull(pigJob.getException());
System.out.println(pigJob.getException());
}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java Wed Oct 21 14:17:54 2009
@@ -139,7 +139,7 @@
"records",
new Path(pathTable, "store").toString(),
TableStorer.class.getCanonicalName()
- + "('SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g', '[SF_a, SF_b, SF_c]; [SF_e]')");
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
}
}
\ No newline at end of file