You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/21 16:17:55 UTC

svn commit: r828027 - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/pig/ src/test/org/apache/hadoop/zebra/pig/

Author: gates
Date: Wed Oct 21 14:17:54 2009
New Revision: 828027

URL: http://svn.apache.org/viewvc?rev=828027&view=rev
Log:
PIG-944  Change schema to be taken from StoreConfig instead of TableStorer's constructor.

Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Wed Oct 21 14:17:54 2009
@@ -15,6 +15,9 @@
 
   BUG FIXES
 
+	PIG-944  Change schema to be taken from StoreConfig instead of
+	TableStorer's constructor (yanz via gates).
+
     PIG-918. Fix infinite loop only columns in first column group are
     specified. (Yan Zhou via rangadi)
  

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Wed Oct 21 14:17:54 2009
@@ -21,72 +21,52 @@
 import java.util.Iterator;
 
 import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.ColumnType;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
-/**
- * A simple schema converter that only understands three field types.
- */
 class SchemaConverter {
-  enum FieldSchemaMaker {
-    SimpleField("SF_") {
-      @Override
-      FieldSchema toFieldSchema(String name) {
-        return new FieldSchema(name, DataType.BYTEARRAY);
-      }
-    },
-
-    MapField("MF_") {
-      @Override
-      FieldSchema toFieldSchema(String name) {
-        // TODO: how to convey key and value types?
-        return new FieldSchema(name, DataType.MAP);
-      }
-    },
-
-    MapListField("MLF_") {
-      @Override
-      FieldSchema toFieldSchema(String name) throws FrontendException {
-        Schema tupleSchema = new Schema();
-        tupleSchema.add(MapField.toFieldSchema(null));
-        tupleSchema.setTwoLevelAccessRequired(true);
-        return new FieldSchema(name, tupleSchema, DataType.BAG);
-      }
-    };
-
-    private String prefix;
-
-    FieldSchemaMaker(String prefix) {
-      this.prefix = prefix;
-    }
-
-    abstract FieldSchema toFieldSchema(String name) throws FrontendException;
-
-    public static FieldSchema makeFieldSchema(String colname)
-        throws FrontendException {
-      for (FieldSchemaMaker e : FieldSchemaMaker.values()) {
-        if (colname.startsWith(e.prefix)) {
-          return e.toFieldSchema(colname.substring(e.prefix.length()));
-        }
-      }
-      throw new FrontendException("Cannot determine type from column name");
-    }
-    
-    public static String makeColumnName(FieldSchema fs)
-        throws FrontendException {
-      if (fs.alias == null) {
-        throw new FrontendException("No alias provided for field schema");
-      }
-      for (FieldSchemaMaker e : FieldSchemaMaker.values()) {
-        FieldSchema expected = e.toFieldSchema("dummy");
-        if (FieldSchema.equals(fs, expected, false, true)) {
-          return e.prefix + fs.alias;
-        }
-      }
-      throw new FrontendException("Unsupported field schema");
+  public static ColumnType toTableType(byte ptype)
+  {
+    ColumnType ret;
+    switch (ptype) {
+      case DataType.INTEGER:
+        ret = ColumnType.INT; 
+        break;
+      case DataType.LONG:
+        ret = ColumnType.LONG; 
+        break;
+      case DataType.FLOAT:
+        ret = ColumnType.FLOAT; 
+        break;
+      case DataType.DOUBLE:
+        ret = ColumnType.DOUBLE; 
+        break;
+      case DataType.BOOLEAN:
+        ret = ColumnType.BOOL; 
+        break;
+      case DataType.BAG:
+        ret = ColumnType.COLLECTION; 
+        break;
+      case DataType.MAP:
+        ret = ColumnType.MAP; 
+        break;
+      case DataType.TUPLE:
+        ret = ColumnType.RECORD; 
+        break;
+      case DataType.CHARARRAY:
+        ret = ColumnType.STRING; 
+        break;
+      case DataType.BYTEARRAY:
+        ret = ColumnType.BYTES; 
+        break;
+      default:
+        ret = null;
+        break;
     }
+    return ret;
   }
   
   public static Schema toPigSchema(
@@ -97,24 +77,39 @@
     	org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema = 
     		tschema.getColumn(col);
 			if (columnSchema != null) {
-        ret.add(new FieldSchema(col, columnSchema.getType().pigDataType()));
+        ColumnType ct = columnSchema.getType();
+        if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
+            ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
+          ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
+        else
+          ret.add(new FieldSchema(col, ct.pigDataType()));
 			} else {
 				ret.add(new FieldSchema(null, null));
 			}
     }
-    
     return ret;
   }
 
   public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
       Schema pschema) throws FrontendException, ParseException {
-    String[] colnames = new String[pschema.size()];
-    int i = 0;
-    for (Iterator<FieldSchema> it = pschema.getFields().iterator(); it
-        .hasNext(); ++i) {
-      FieldSchema fs = it.next();
-      colnames[i] = FieldSchemaMaker.makeColumnName(fs);
+    org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
+    Schema.FieldSchema columnSchema;
+    for (int i = 0; i < pschema.size(); i++) {
+    	columnSchema = pschema.getField(i);
+    	if (columnSchema != null) {
+    		if (DataType.isSchemaType(columnSchema.type))
+    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
+    					fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
+    		else if (columnSchema.type == DataType.MAP)
+    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
+    					new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, 
+    							org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
+    		else
+    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
+		  } else {
+		  	tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
+		  }
     }
-    return new org.apache.hadoop.zebra.schema.Schema(colnames);
+    return tschema;
   }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Wed Oct 21 14:17:54 2009
@@ -35,18 +35,17 @@
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 
 public class TableStorer implements StoreFunc {
-	private String schemaString;
 	private String storageHintString;
 
 	public TableStorer() {	  
 	}
 
-	public TableStorer(String schemaStr, String storageHintStr) throws ParseException, IOException {
-		schemaString = schemaStr;
+	public TableStorer(String storageHintStr) throws ParseException, IOException {
 		storageHintString = storageHintStr;
 	}
   
@@ -69,10 +68,6 @@
 	public Class getStorePreparationClass() throws IOException {
 		return TableOutputFormat.class;
 	}
-
-	public String getSchemaString() {
-		return schemaString;  
-	}
   
 	public String getStorageHintString() {
 		return storageHintString;  
@@ -94,10 +89,16 @@
 	@Override
 	public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
 		StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
-		String location = storeConfig.getLocation();
+		String location = storeConfig.getLocation(), schemaStr;
+    Schema schema = storeConfig.getSchema();
+    try {
+      schemaStr = SchemaConverter.fromPigSchema(schema).toString();
+    } catch (ParseException e) {
+      throw new IOException("Exception thrown from SchemaConverter: " + e.getMessage());
+    }
 		TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);   
 		BasicTable.Writer writer = new BasicTable.Writer(new Path(location), 
-				storeFunc.getSchemaString(), storeFunc.getStorageHintString(), false, job);
+				schemaStr, storeFunc.getStorageHintString(), false, job);
 		writer.finish();
 	}
     
@@ -134,11 +135,11 @@
 		writer.finish();
 	}
 
-	@Override
-	public void write(BytesWritable key, Tuple value) throws IOException {
-		if (key == null) {
-			key = KEY0;
-		}
-		inserter.insert(key, value);
-	}
+  @Override
+  public void write(BytesWritable key, Tuple value) throws IOException {
+    if (key == null) {
+      key = KEY0;
+    }
+    inserter.insert(key, value);
+  }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java Wed Oct 21 14:17:54 2009
@@ -143,6 +143,6 @@
 
     pigServer.store("records", new Path(pathTable, "store").toString(),
         TableStorer.class.getCanonicalName()
-            + "('c:collection(a:double, b:float, c:bytes)', '[c]')");
+            + "('[c]')");
   }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java Wed Oct 21 14:17:54 2009
@@ -132,6 +132,6 @@
     pigServer
         .store("records", new Path(pathTable, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('m:map(string)', '[m#{a|b}]')");
+                + "('[m#{a|b}]')");
   }
 }
\ No newline at end of file

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java Wed Oct 21 14:17:54 2009
@@ -75,7 +75,7 @@
     /*
      * pigServer.store("records", new Path(pathTable, "store").toString(),
      * TableStorer.class.getCanonicalName() +
-     * "('SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g', '[SF_a, SF_b, SF_c]; [SF_e]')" );
+     * "('[SF_a, SF_b, SF_c]; [SF_e]')" );
      */
   }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Wed Oct 21 14:17:54 2009
@@ -265,7 +265,7 @@
             "records",
             new Path(newPath, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+                + "('[s1, s2]; [s3, s4]')");
 
   }
 
@@ -290,7 +290,7 @@
             "newRecord",
             newPath.toString(),
             TableStorer.class.getCanonicalName()
-                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+                + "('[s1, s2]; [s3, s4]')");
 
     // check new table content
     String query3 = "newRecords = LOAD '"
@@ -340,7 +340,7 @@
     Path newPath = new Path(getCurrentMethodName());
     pigServer.store("newRecord", newPath.toString(), TableStorer.class
         .getCanonicalName()
-        + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '')");
+        + "('')");
 
     // check new table content
     String query3 = "newRecords = LOAD '"
@@ -390,7 +390,7 @@
     Path newPath = new Path(getCurrentMethodName());
     pigServer.store("newRecord", newPath.toString(), TableStorer.class
         .getCanonicalName()
-        + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[]')");
+        + "('[]')");
 
     // check new table content
     String query3 = "newRecords = LOAD '"
@@ -443,7 +443,7 @@
             "records",
             new Path(newPath, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+                + "('[s7, s2]; [s3, s4]')");
     Assert.assertNotNull(pigJob.getException());
     System.out.println(pigJob.getException());
   }
@@ -469,7 +469,7 @@
               "records",
               new Path(newPath, "store").toString(),
               TableStorer.class.getCanonicalName()
-                  + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s1, s4]')");
+                  + "('[s1, s2]; [s1, s4]')");
       Assert.assertNotNull(pigJob.getException());
       System.out.println(pigJob.getException());
   }
@@ -495,7 +495,7 @@
             "records",
             new Path(newPath, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1]; [s1]')");
+                + "('[s1]; [s1]')");
     Assert.assertNotNull(pigJob.getException());
     System.out.println(pigJob.getException());
   }
@@ -521,7 +521,7 @@
             "records",
             new Path(newPath, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('s1:int, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+                + "('[s1, s2]; [s3, s4]')");
     Assert.assertNotNull(pigJob.getException());
     System.out.println(pigJob.getException());
   }
@@ -546,7 +546,7 @@
             "records",
             path.toString(),
             TableStorer.class.getCanonicalName()
-                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+                + "('[s1, s2]; [s3, s4]')");
     Assert.assertNotNull(pigJob.getException());
     System.out.println(pigJob.getException());
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java?rev=828027&r1=828026&r2=828027&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java Wed Oct 21 14:17:54 2009
@@ -139,7 +139,7 @@
             "records",
             new Path(pathTable, "store").toString(),
             TableStorer.class.getCanonicalName()
-                + "('SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g', '[SF_a, SF_b, SF_c]; [SF_e]')");
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
 
   }
 }
\ No newline at end of file