You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/04/16 21:09:00 UTC

svn commit: r935046 [1/2] - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapreduce/ src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/types/ src/test/org/apache/hadoop...

Author: yanz
Date: Fri Apr 16 19:08:58 2010
New Revision: 935046

URL: http://svn.apache.org/viewvc?rev=935046&view=rev
Log:
PIG-1351 Addition of type check when writing to basic table (chaow via yanz)

Added:
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestTypeCheck.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypeCheck.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestTypeCheck.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestProjectionOnFullMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveUnion.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionSourceTableProj.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinInteger.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinMultipleColsSort.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestUnionMixedTypes.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Apr 16 19:08:58 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1351 Addition of type check when writing to basic table (chaow via yanz)
+
     PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema specified in the constructor of TableLoader instead of pruned proejction by pig (gauravj via daijy)
 
     PIG-1291 Support of virtual column "source_table" on unsorted table (yanz)

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Fri Apr 16 19:08:58 2010
@@ -1564,12 +1564,36 @@ public class BasicTable {
      */
     public TableInserter getInserter(String name, boolean finishWriter)
         throws IOException {
+      return this.getInserter(name, finishWriter, true);
+    }
+    
+    /**
+     * Get a inserter with a given name.
+     * 
+     * @param name
+     *          the name of the inserter. If multiple calls to getInserter with
+     *          the same name has been called, we expect they are the result of
+     *          speculative execution and at most one of them will succeed.
+     * @param finishWriter
+     *          finish the underlying Writer object upon the close of the
+     *          Inserter. Should be set to true if there is only one inserter
+     *          operate on the table, so we should call finish() after the
+     *          Inserter is closed.
+     * @param checktype 
+     *          whether or not do type check.
+     * 
+     * @return A inserter object.
+     * @throws IOException
+     */
+    public TableInserter getInserter(String name, boolean finishWriter, boolean checkType)
+        throws IOException {
       if (closed) {
         throw new IOException("BasicTable closed");
       }
-      return new BTInserter(name, finishWriter, partition);
+      return new BTInserter(name, finishWriter, partition, checkType);
     }
 
+
     /**
      * Obtain an output stream for creating a Meta Block with the specific name.
      * This method can only be called after we insert all rows into the table.
@@ -1595,10 +1619,15 @@ public class BasicTable {
 
       BTInserter(String name, boolean finishWriter, Partition partition)
           throws IOException {
+        this(name, finishWriter, partition, true);
+      }
+      
+      BTInserter(String name, boolean finishWriter, Partition partition, boolean checkType)
+      throws IOException {
         try {
           cgInserters = new ColumnGroup.Writer.CGInserter[colGroups.length];
           for (int nx = 0; nx < colGroups.length; nx++) {
-            cgInserters[nx] = colGroups[nx].getInserter(name, false);
+            cgInserters[nx] = colGroups[nx].getInserter(name, false, checkType);
           }
           this.finishWriter = finishWriter;
           this.partition = partition;

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Fri Apr 16 19:08:58 2010
@@ -1835,10 +1835,31 @@ class ColumnGroup {
      */
     public TableInserter getInserter(String name, boolean finishWriter)
         throws IOException {
+      return getInserter(name, finishWriter, true);      
+    }
+    
+    /**
+     * Get a inserter with a given name.
+     * 
+     * @param name
+     *          the name of the inserter.
+     * @param finishWriter
+     *          finish the underlying Writer object upon the close of the
+     *          Inserter. Should be set to true if there is only one inserter
+     *          operate on the table, so we should call finish() after the
+     *          Inserter is closed.
+     * @param checktype
+     *          whether or not do type check.
+     * 
+     * @return A table inserter object.
+     * @throws IOException
+     */
+    public TableInserter getInserter(String name, boolean finishWriter, boolean checkType)
+        throws IOException {
       if (finished) {
         throw new IOException("ColumnGroup has been closed for insertion.");
       }
-      return new CGInserter(name, finishWriter);
+      return new CGInserter(name, finishWriter, checkType);
     }
 
     private void createIndex() throws IOException {
@@ -1888,7 +1909,7 @@ class ColumnGroup {
       TFile.Writer tfileWriter;
       TupleWriter tupleWriter;
       boolean closed = true;
-      
+      boolean checkType = true;
       
       private void createTempFile() throws IOException {
         int maxTrial = 10;
@@ -1937,15 +1958,17 @@ class ColumnGroup {
           }
         }
       }
-
-      CGInserter(String name, boolean finishWriter) throws IOException {
+      
+      CGInserter(String name, boolean finishWriter, boolean checkType) throws IOException {
         this.name = name;
         this.finishWriter = finishWriter;
         this.tupleWriter = new TupleWriter(getSchema());
+        this.checkType = checkType;
+        
         try {
           createTempFile();
           tfileWriter =
-        	  new TFile.Writer(out, getMinBlockSize(conf), cgschema.getCompressor(), cgschema.getComparator(), conf);
+            new TFile.Writer(out, getMinBlockSize(conf), cgschema.getCompressor(), cgschema.getComparator(), conf);
           closed = false;
         }
         finally {
@@ -1978,6 +2001,7 @@ class ColumnGroup {
         }
       }
 
+
       @Override
       public Schema getSchema() {
         return ColumnGroup.Writer.this.getSchema();
@@ -1985,7 +2009,16 @@ class ColumnGroup {
 
       @Override
       public void insert(BytesWritable key, Tuple row) throws IOException {
-        TypesUtils.checkCompatible(row, getSchema());
+        /*
+         * If checkType is set to be true, we check for the first row - this is only a sanity check preventing
+         * users from messing up output schema;
+         * If checkType is set to be false, we do not do any type check. 
+         */
+        if (checkType == true) {
+          TypesUtils.checkCompatible(row, getSchema());
+          checkType = false;
+        }
+        
         DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getLength());
         try {
           outKey.write(key.getBytes(), 0, key.getLength());

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java Fri Apr 16 19:08:58 2010
@@ -147,6 +147,7 @@ public class BasicTableOutputFormat exte
 	private static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
 	public static final String MULTI_OUTPUT_PATH = "mapreduce.lib.table.multi.output.dirs";
 	private static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
+	static final String OUTPUT_CHECKTYPE = "mapreduce.lib.table.output.checktype";
 	private static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
 	private static final String OUTPUT_SORTCOLUMNS = "mapreduce.lib.table.output.sortcolumns";
 	private static final String OUTPUT_COMPARATOR =  "mapreduce.lib.table.output.comparator";
@@ -335,6 +336,11 @@ public class BasicTableOutputFormat exte
 	public static void setSchema(JobContext jobContext, String schema) {
 		Configuration conf = jobContext.getConfiguration();
 		conf.set(OUTPUT_SCHEMA, Schema.normalize(schema));
+		
+    // This is to turn off type check for potential corner cases - for internal use only;
+		if (System.getenv("zebra_output_checktype")!= null && System.getenv("zebra_output_checktype").equals("no")) {
+      conf.setBoolean(OUTPUT_CHECKTYPE, false);
+    }
 	}
 
 	/**
@@ -723,7 +729,6 @@ class TableOutputCommitter extends Outpu
 		// TODO Auto-generated method stub
 
 	}
-
 }
 
 /**
@@ -733,22 +738,22 @@ class TableRecordWriter extends RecordWr
 	private final TableInserter inserter[];
 	private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op = null;
 
-
 	public TableRecordWriter(String path, TaskAttemptContext context) throws IOException {	
 		Configuration conf = context.getConfiguration();
 		if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == true) {	  
 			op = (org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition) 
 			ReflectionUtils.newInstance(BasicTableOutputFormat.getZebraOutputPartitionClass(context), conf);
-
-		}  
+		}
+		
+		boolean checkType = conf.getBoolean(BasicTableOutputFormat.OUTPUT_CHECKTYPE, true);
 		Path [] paths = BasicTableOutputFormat.getOutputPaths(context);
 		inserter = new TableInserter[paths.length];
                 String inserterName = "part-" + context.getTaskAttemptID().getTaskID().getId();
 		for(int i = 0; i < paths.length; ++i) {
 			BasicTable.Writer writer =
 				new BasicTable.Writer(paths[i], conf);
-			this.inserter[i] = writer.getInserter( inserterName, true);
-		}	
+			this.inserter[i] = writer.getInserter( inserterName, true, checkType);
+		}
 	}
 
 	@Override

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Fri Apr 16 19:08:58 2010
@@ -55,11 +55,13 @@ import org.apache.pig.impl.util.UDFConte
 public class TableStorer extends StoreFunc implements StoreMetadata {
     private static final String UDFCONTEXT_OUTPUT_SCHEMA = "zebra.UDFContext.outputSchema";
     private static final String UDFCONTEXT_SORT_INFO = "zebra.UDFContext.sortInfo";
+    private static final String UDFCONTEXT_OUTPUT_CHECKTYPE = "zebra.UDFContext.checkType";
 
     static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
     static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
     static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
     static final String SORT_INFO = "mapreduce.lib.table.sort.info";
+    static final String OUTPUT_CHECKTYPE = "mapreduce.lib.table.output.checktype";
 
     private String storageHintString = null;
     private String udfContextSignature = null;
@@ -123,6 +125,11 @@ public class TableStorer extends StoreFu
                 this.getClass(), new String[]{ udfContextSignature } );
         properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() );
         properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() );
+        
+        // This is to turn off type check for potential corner cases - for internal use only;
+        if (System.getenv("zebra_output_checktype") != null && System.getenv("zebra_output_checktype").equals("no")) {
+          properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no");
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -160,6 +167,11 @@ public class TableStorer extends StoreFu
                 this.getClass(), new String[]{ udfContextSignature } );
         conf.set( OUTPUT_SCHEMA, properties.getProperty( UDFCONTEXT_OUTPUT_SCHEMA ) );
         conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) );
+        
+        // Get checktype information from UDFContext and re-store it to job config;
+        if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
+          conf.setBoolean(OUTPUT_CHECKTYPE, false);
+        }
     }
 
     @Override
@@ -291,7 +303,8 @@ class TableRecordWriter extends RecordWr
             builder = makeKeyBuilder(types);
         }
 
-        inserter = writer.getInserter( "patition-" + taContext.getTaskAttemptID().getTaskID().getId(), false );
+        boolean checkType = conf.getBoolean(TableStorer.OUTPUT_CHECKTYPE, true);
+        inserter = writer.getInserter("patition-" + taContext.getTaskAttemptID().getTaskID().getId(), false, checkType);
     }
 
     @Override

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java Fri Apr 16 19:08:58 2010
@@ -19,13 +19,20 @@ package org.apache.hadoop.zebra.types;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.hadoop.zebra.schema.ColumnType;
 import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 
 /**
@@ -93,7 +100,117 @@ public class TypesUtils {
       throw new RuntimeException("Internal error: " + e.toString());
     }
   }
-
+  
+  private static void checkTypeError(ColumnSchema cs, ColumnType type) throws IOException {
+    throw new IOException("Incompatible Tuple object - datum is " + type + ", but schema says " + cs.getType());
+  }
+  
+  private static void checkColumnType(ColumnSchema cs, ColumnType type) throws IOException {
+    switch (type) {
+      case BOOL:
+      case DOUBLE:
+      case STRING:
+      case BYTES:
+      case MAP:
+      case COLLECTION:
+      case RECORD:
+        if (cs.getType() != type) {
+          checkTypeError(cs, type);
+        }
+        break;
+      case FLOAT:
+        if (cs.getType() != ColumnType.FLOAT && cs.getType() != ColumnType.DOUBLE) {
+          checkTypeError(cs, type);
+        }
+        break;
+      case LONG:
+        if (cs.getType() != ColumnType.LONG && cs.getType() != ColumnType.FLOAT && cs.getType() != ColumnType.DOUBLE) {
+          checkTypeError(cs, type);
+        }
+        break;
+      case INT:
+        if (cs.getType() != ColumnType.INT && cs.getType() != ColumnType.LONG && cs.getType() != ColumnType.FLOAT && cs.getType() != ColumnType.DOUBLE) {
+          checkTypeError(cs, type);
+        }
+        break;
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static void checkColumn(Object d, ColumnSchema cs) throws IOException {
+    if (d instanceof Boolean) {
+      checkColumnType(cs, ColumnType.BOOL);   
+    } else if (d instanceof Integer) {
+      checkColumnType(cs, ColumnType.INT);
+    } else if (d instanceof Long) {
+      checkColumnType(cs, ColumnType.LONG);
+    } else if (d instanceof Float) {
+      checkColumnType(cs, ColumnType.FLOAT); 
+    } else if (d instanceof Double) {
+      checkColumnType(cs, ColumnType.DOUBLE);
+    } else if (d instanceof String) {
+      checkColumnType(cs, ColumnType.STRING);
+    } else if (d instanceof DataByteArray) {
+      checkColumnType(cs, ColumnType.BYTES);
+    } else if (d instanceof Map) {
+      checkMapColumn((Map<String, Object>)d, cs);
+    } else if (d instanceof DataBag) {
+      checkCollectionColumn((DataBag)d, cs);
+    } else if (d instanceof Tuple) {
+      checkRecordColumn((Tuple)d, cs);
+    } else {
+      throw new IOException("Unknown data type");
+    }
+  }
+    
+  private static void checkMapColumn(Map<String, Object> m, ColumnSchema cs) throws IOException {
+    checkColumnType(cs, ColumnType.MAP);
+    Schema schema = cs.getSchema();
+    Assert.assertTrue(schema.getNumColumns() == 1);
+    
+    ColumnSchema tempColumnSchema = schema.getColumn(0);
+    if (tempColumnSchema.getType() == ColumnType.BYTES) { // We do not check inside of map if its value type is BYTES;
+                                            // This is for Pig, since it only supports BYTES as map value type.
+      return;
+    }
+        
+    Map<String, Object> m1 = (Map<String, Object>)m;
+    for (Map.Entry<String, Object> e : m1.entrySet()) {
+      Object d = e.getValue();
+      if (d != null) {
+        checkColumn(d, tempColumnSchema);
+        return;   // We only check the first non-null map value in the map;
+      }
+    }
+  }
+  
+  private static void checkCollectionColumn(DataBag bag, ColumnSchema cs) throws IOException {
+    checkColumnType(cs, ColumnType.COLLECTION);
+    Schema schema = cs.getSchema();
+    Assert.assertTrue(schema.getNumColumns() == 1);
+    
+    Iterator<Tuple> iter = bag.iterator();
+    while (iter.hasNext()) {
+      Tuple tempTuple = iter.next();
+      // collection has to be on record;
+      if (tempTuple != null) {
+        checkRecordColumn(tempTuple, schema.getColumn(0));
+        return;     // We only check the first non-null record in the collection;
+      }
+    }     
+  }
+  
+  private static void checkRecordColumn(Tuple d, ColumnSchema cs) throws IOException {
+    checkColumnType(cs, ColumnType.RECORD);
+    checkNumberColumnCompatible(d, cs.getSchema());
+    
+    for (int i=0; i<d.size(); i++) {
+      if (d.get(i) != null) { // "null" can match any type;
+        checkColumn(d.get(i), cs.getSchema().getColumn(i));
+      }  
+    }
+  }
+  
   /**
    * Check whether the input row object is compatible with the expected schema
    * 
@@ -102,12 +219,27 @@ public class TypesUtils {
    * @param schema
    *          Table schema
    * @throws IOException
+   */  
+  public static void checkCompatible(Tuple tuple, Schema schema) throws IOException {
+    // Create a dummy record ColumnSchema since we do not have it;
+    ColumnSchema dummy = new ColumnSchema("dummy", schema);
+
+    checkRecordColumn(tuple, dummy);
+  } 
+  
+  /**
+   * Check whether the input row object is compatible with the expected schema
+   * on number of Columns;
+   * @param tuple
+   *          Input Tuple object
+   * @param schema
+   *          Table schema
+   * @throws IOException
    */
-  public static void checkCompatible(Tuple tuple, Schema schema)
+  public static void checkNumberColumnCompatible(Tuple tuple, Schema schema)
       throws IOException {
-    // TODO: Add more rigorous checking.
     if (tuple.size() != schema.getNumColumns()) {
-      throw new IOException("Incompatible Tuple object - number of fields");
+      throw new IOException("Incompatible Tuple object - tuple has " + tuple.size() + " columns, but schema says " + schema.getNumColumns() + " columns");
     }
   }
 
@@ -158,7 +290,7 @@ public class TypesUtils {
      * @throws IOException
      */
     public void get(DataInputStream in, Tuple row) throws IOException, ParseException {
-      checkCompatible(row, projection.getSchema());
+      checkNumberColumnCompatible(row, projection.getSchema());
       tuple.readFields(in);
       TypesUtils.resetTuple(row);
       try {
@@ -196,7 +328,6 @@ public class TypesUtils {
      * @throws IOException
      */
     public void put(DataOutputStream out, Tuple row) throws IOException {
-      checkCompatible(row, physical);
       row.write(out);
     }
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Fri Apr 16 19:08:58 2010
@@ -43,6 +43,7 @@ import org.apache.hadoop.zebra.types.Pro
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -112,7 +113,7 @@ public class TestBasicTable {
           TypesUtils.resetTuple(tuple);
           for (int k = 0; k < tuple.size(); ++k) {
             try {
-              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+              tuple.set(k, new DataByteArray(makeString("col-" + colNames[k], rows * 10).getBytes()));
             } catch (ExecException e) {
               e.printStackTrace();
             }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java Fri Apr 16 19:08:58 2010
@@ -51,7 +51,7 @@ import org.junit.Test;
  * 
  */
 public class TestBasicTableMapSplits {
-  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
   final static String STR_STORAGE = "[r.f12, f1, m#{b}]; [m#{a}, r.f11]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java Fri Apr 16 19:08:58 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.zebra.types.Pro
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -78,7 +79,7 @@ public class TestBasicTableProjections {
       for (int row = 0; row < 2; row++) {
         try {
           for (int nx = 0; nx < tuple.size(); nx++)
-            tuple.set(nx, String.format("%c%d%d", 'a' + nx, part + 1, row + 1));
+            tuple.set(nx, new DataByteArray(String.format("%c%d%d", 'a' + nx, part + 1, row + 1).getBytes()));
         } catch (ExecException e) {
           e.printStackTrace();
         }
@@ -114,8 +115,8 @@ public class TestBasicTableProjections {
     scanner.getKey(key);
     Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
     scanner.getValue(value);
-    Assert.assertEquals("f11", value.get(0));
-    Assert.assertEquals("a11", value.get(1));
+    Assert.assertEquals("f11", value.get(0).toString());
+    Assert.assertEquals("a11", value.get(1).toString());
     try {
       value.get(2);
       Assert.fail("Failed to catch out of boundary exceptions.");
@@ -128,24 +129,24 @@ public class TestBasicTableProjections {
     Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
     TypesUtils.resetTuple(value);
     scanner.getValue(value);
-    Assert.assertEquals("f12", value.get(0));
-    Assert.assertEquals("a12", value.get(1));
+    Assert.assertEquals("f12", value.get(0).toString());
+    Assert.assertEquals("a12", value.get(1).toString());
 
     scanner.advance();
     scanner.getKey(key);
     Assert.assertEquals(key, new BytesWritable("k21".getBytes()));
     TypesUtils.resetTuple(value);
     scanner.getValue(value);
-    Assert.assertEquals("f21", value.get(0));
-    Assert.assertEquals("a21", value.get(1));
+    Assert.assertEquals("f21", value.get(0).toString());
+    Assert.assertEquals("a21", value.get(1).toString());
 
     scanner.advance();
     scanner.getKey(key);
     Assert.assertEquals(key, new BytesWritable("k22".getBytes()));
     TypesUtils.resetTuple(value);
     scanner.getValue(value);
-    Assert.assertEquals("f22", value.get(0));
-    Assert.assertEquals("a22", value.get(1));
+    Assert.assertEquals("f22", value.get(0).toString());
+    Assert.assertEquals("a22", value.get(1).toString());
   }
 
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java Fri Apr 16 19:08:58 2010
@@ -50,7 +50,7 @@ import org.junit.Test;
  * 
  */
 public class TestBasicTableSplits {
-  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
   // TODO: try map hash split later
   final static String STR_STORAGE = "[r.f12, f1]; [m]";
   private static Configuration conf;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java Fri Apr 16 19:08:58 2010
@@ -46,8 +46,7 @@ import org.junit.runners.Suite;
   TestRecord.class,
   TestRecordMap.class,
   TestSchema.class,
-  TestSimple.class,
-  TestWrite.class  
+  TestWrite.class
 })
 
 public class TestCheckin {

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java Fri Apr 16 19:08:58 2010
@@ -48,8 +48,8 @@ import org.junit.Test;
  * 
  */
 public class TestCollection {
+  final static String STR_SCHEMA = "c:collection(record(a:double, b:double, c:bytes)),c2:collection(record(r1:record(f1:int, f2:string), d:string)),c3:collection(record(e:int,f:bool))";
 
-  final static String STR_SCHEMA = "c:collection(record(a:double, b:float, c:bytes)),c2:collection(record(r1:record(f1:int, f2:string), d:string)),c3:collection(record(c3_1:collection(record(e:int,f:bool))))";
   final static String STR_STORAGE = "[c]";
   private static Configuration conf;
   private static Path path;
@@ -191,8 +191,7 @@ public class TestCollection {
     tuple.set(2, bag3);
 
     int row = 0;
-    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
-        .getBytes()), tuple);
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1).getBytes()), tuple);
 
     row++;
 
@@ -670,7 +669,7 @@ public class TestCollection {
   // Negative should not support 2nd level collection split
   @Test
   public void testSplit1() throws IOException, ParseException {
-    String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+    String STR_SCHEMA = "c:collection(a:double, b:double, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
     String STR_STORAGE = "[c.a]";
     conf = new Configuration();
     conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
@@ -695,7 +694,7 @@ public class TestCollection {
   // Negative should not support none_existent column split
   @Test
   public void testSplit2() throws IOException, ParseException {
-    String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+    String STR_SCHEMA = "c:collection(a:double, b:double, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
     String STR_STORAGE = "[d]";
     conf = new Configuration();
     conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Fri Apr 16 19:08:58 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.zebra.parser.Pa
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -135,7 +136,7 @@ public class TestColumnGroup {
           TypesUtils.resetTuple(tuple);
           for (int k = 0; k < tuple.size(); ++k) {
             try {
-              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+              tuple.set(k, new DataByteArray(makeString("col-" + colNames[k], rows * 10).getBytes()));
             } catch (ExecException e) {
               e.printStackTrace();
             }
@@ -233,7 +234,7 @@ public class TestColumnGroup {
           TypesUtils.resetTuple(tuple);
           for (int k = 0; k < tuple.size(); ++k) {
             try {
-              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+              tuple.set(k, new DataByteArray(makeString("col-" + colNames[k], rows * 10).getBytes()));
             } catch (ExecException e) {
               e.printStackTrace();
             }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java Fri Apr 16 19:08:58 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.zebra.parser.Pa
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -120,7 +121,7 @@ public class TestColumnGroupInserters {
     ParseException {
     fs.delete(path, true);
     System.out.println("testFailureInsertAfterClose");
-    writer = new ColumnGroup.Writer(path, "abc, def ", false, path.getName(), "pig", "gz",
+    writer = new ColumnGroup.Writer(path, "abc:string, def:map(string)", false, path.getName(), "pig", "gz",
         null, null, (short) -1, true, conf);
     TableInserter ins = writer.getInserter("part1", true);
 
@@ -177,7 +178,7 @@ public class TestColumnGroupInserters {
     ParseException {
     fs.delete(path, true);
     System.out.println("testInsertOneRow");
-    writer = new ColumnGroup.Writer(path, "abc, def", false, path.getName(), "pig", "gz",
+    writer = new ColumnGroup.Writer(path, "abc:string, def:map(string)", false, path.getName(), "pig", "gz",
         null, null, (short) -1, true, conf);
     TableInserter ins = writer.getInserter("part1", true);
 
@@ -199,7 +200,7 @@ public class TestColumnGroupInserters {
     ParseException {
     fs.delete(path, true);
     System.out.println("testInsert2Rows");
-    writer = new ColumnGroup.Writer(path, "abc, def", false, path.getName(), "pig", "gz",
+    writer = new ColumnGroup.Writer(path, "abc:string, def:map(string)", false, path.getName(), "pig", "gz",
         null, null, (short) -1, true, conf);
     TableInserter ins = writer.getInserter("part1", true);
 
@@ -233,7 +234,7 @@ public class TestColumnGroupInserters {
     ParseException {
     fs.delete(path, true);
     System.out.println("testInsert2Inserters");
-    writer = new ColumnGroup.Writer(path, "abc, def", false, path.getName(), "pig", "gz",
+    writer = new ColumnGroup.Writer(path, "abc:string, def:map(string)", false, path.getName(), "pig", "gz",
         null, null, (short) -1, true, conf);
     TableInserter ins1 = writer.getInserter("part1", true);
     TableInserter ins2 = writer.getInserter("part2", true);
@@ -270,7 +271,7 @@ public class TestColumnGroupInserters {
     ParseException {
     fs.delete(path, true);
     System.out.println("testFailureOverlappingKeys");
-    writer = new ColumnGroup.Writer(path, "abc, def ", true, path.getName(), "pig", "gz",
+    writer = new ColumnGroup.Writer(path, "abc:string, def:map(string)", true, path.getName(), "pig", "gz",
         null, null, (short) -1, true, conf);
     TableInserter ins1 = writer.getInserter("part1", false);
     TableInserter ins2 = writer.getInserter("part2", false);

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java Fri Apr 16 19:08:58 2010
@@ -46,7 +46,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName1 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes";
   final static String STR_STORAGE = "[s1, s2] as PI; [s3, s4] as General; [s5, s6] as ULT";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java Fri Apr 16 19:08:58 2010
@@ -41,7 +41,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName2 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes";
   final static String STR_STORAGE = "[s1, s2] as PI secure by uid:users perm:777; [s3, s4] as General; [s5, s6] as ULT";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java Fri Apr 16 19:08:58 2010
@@ -41,7 +41,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName3 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes";
   final static String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java Fri Apr 16 19:08:58 2010
@@ -41,7 +41,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName4 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes";
   final static String STR_STORAGE = "[s1, s2] as CG1; [s3, s4]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java Fri Apr 16 19:08:58 2010
@@ -41,7 +41,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName5 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, s7:int, s8: int, s9:int, s10:int, s11:int, s12:int";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes, s7:int, s8: int, s9:int, s10:int, s11:int, s12:int";
   final static String STR_STORAGE = "[s1]; [s2]; [s3]; [s4]; [s5]; [s6]; [s7]; [s8]; [s9]; [s10]; [s11]; [s12]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java Fri Apr 16 19:08:58 2010
@@ -41,7 +41,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupName6 {
 
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, s7:int, s8: int, s9:int, s10:int, s11:int, s12:int";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes, s7:int, s8: int, s9:int, s10:int, s11:int, s12:int";
   final static String STR_STORAGE = "[s1] as CG0; [s2] as CG1; [s3] as CG2; [s4] as CG3; [s5] as CG4; [s6] as CG5; [s7] as CG6; [s8] as CG7; [s9] as CG8; [s10] as CG9; [s11] as CG10; [s12]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java Fri Apr 16 19:08:58 2010
@@ -64,7 +64,7 @@ public class TestColumnGroupProjections 
         ColumnGroup.drop(path, conf);
     }
 
-    schema = new Schema("a,b,c,d,e,f,g");
+    schema = new Schema("a:string,b:string,c:string,d:string,e:string,f:string,g:string");
 
     ColumnGroup.Writer writer = new ColumnGroup.Writer(path, schema, false, path.getName(),
         "pig", "gz", null, null, (short) -1, true, conf);

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java Fri Apr 16 19:08:58 2010
@@ -76,7 +76,7 @@ public class TestColumnGroupReaders {
       ParseException {
     System.out.println("testInsert2Inserters");
     boolean sorted = false; // true;
-    writer = new ColumnGroup.Writer(path, "col1, colTWO", sorted, path.getName(), "pig",
+    writer = new ColumnGroup.Writer(path, "col1:string, colTWO:map(string)", sorted, path.getName(), "pig",
         "gz", null, null, (short) -1, true, conf);
     TableInserter ins1 = writer.getInserter("part1", false);
     TableInserter ins2 = writer.getInserter("part2", false);
@@ -171,7 +171,7 @@ public class TestColumnGroupReaders {
   @Test
   public void testMultiWriters() throws ExecException, Exception {
     System.out.println("testMultiWriters");
-    ColumnGroup.Writer writer1 = writeOnePart("col1, col2, col3", 1);
+    ColumnGroup.Writer writer1 = writeOnePart("col1:string, col2:map(string), col3:string", 1);
     ColumnGroup.Writer writer2 = writeOnePart(null, 2);
     ColumnGroup.Writer writer3 = writeOnePart(null, 3);
 

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java Fri Apr 16 19:08:58 2010
@@ -51,7 +51,7 @@ import org.junit.Test;
  */
 public class TestColumnGroupSplits {
   final static String outputFile = "TestColumnGroupSplits";
-  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
   final static private Configuration conf = new Configuration();
   static private FileSystem fs;
   static private Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Fri Apr 16 19:08:58 2010
@@ -394,7 +394,7 @@ public class TestColumnGroupWithWorkPath
   @Test
   public void testNullSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupNullSplits");
-    int totalRows = createCG(2, 10, "a, b, c", path, true, true, null);
+    int totalRows = createCG(2, 10, "a:string, b:string, c:string", path, true, true, null);
     ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
     reader.setProjection("a,d,c,f");
     Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
@@ -406,37 +406,37 @@ public class TestColumnGroupWithWorkPath
   @Test
   public void testNegativeSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestNegativeSplits");
-    int totalRows = createCG(2, 100, "a, b, c", path, true, true, null);
+    int totalRows = createCG(2, 100, "a:string, b:string, c:string", path, true, true, null);
     rangeSplitCG(-1, totalRows, "a,d,c,f", path);
   }
 
   @Test
   public void testEmptyCG() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
-    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
-    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
+    doReadWrite(path, 0, 0, "a:string, b:string, c:string", "a, d, c, f", true, false, null);
+    doReadWrite(path, 0, 0, "a:string, b:string, c:string", "a, d, c, f", true, true, null);
   }
 
   @Test
   public void testEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
-    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
-    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);    
+    doReadWrite(path, 2, 0, "a:string, b:string, c:string", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 0, "a:string, b:string, c:string", "a, d, c, f", true, true, null);    
   }
 
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupNormal");
-    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
-    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
+    doReadWrite(path, 2, 500, "a:string, b:string, c:string", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 500, "a:string, b:string, c:string", "a, d, c, f", true, true, null);
   }
 
   @Test
   public void testSomeEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
 		for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
-      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
+      doReadWrite(path, 2, 250, "a:string, b:string, c:string", "a, d, c, f", true, false,
           emptyTFiles);
-      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
+      doReadWrite(path, 2, 250, "a:string, b:string, c:string", "a, d, c, f", true, true,
           emptyTFiles);    
     }
   }
@@ -459,7 +459,7 @@ public class TestColumnGroupWithWorkPath
   @Test
   public void testProjection() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupProjection");
-    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    int totalRows = createCG(2, 250, "a:string, b:string, c:string", path, true, true, null);
     Assert.assertEquals(totalRows, countRows(path, null));
     Assert.assertEquals(totalRows, countRows(path, ""));
   }
@@ -467,7 +467,7 @@ public class TestColumnGroupWithWorkPath
   @Test
   public void testDuplicateKeys() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupDuplicateKeys");
-    int totalRows = createCGDupKeys(2, 250, "a, b, c", path);
+    int totalRows = createCGDupKeys(2, 250, "a:string, b:string, c:string", path);
     doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
         path);
   }
@@ -476,7 +476,7 @@ public class TestColumnGroupWithWorkPath
   public void testSortedCGKeySplit() throws IOException, ParseException {
     conf.setInt("table.output.tfile.minBlock.size", 640 * 1024);
     Path path = new Path(rootPath, "TestSortedCGKeySplit");
-    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    int totalRows = createCG(2, 250, "a:string, b:string, c:string", path, true, true, null);
     doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
         path);
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java Fri Apr 16 19:08:58 2010
@@ -51,11 +51,11 @@ import org.junit.Test;
  */
 public class TestColumnName {
 	final static String STR_SCHEMA = 
-		"f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+		"f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
 	final static String STR_STORAGE = "[r.f12, f1, m#{b}]; [m#{_a}, r.f11]";
 
 	final static String INVALID_STR_SCHEMA = 
-		"_f1:bool, _r:record(f11:int, _f12:long), _m:map(string), _c:collection(record(_f13:double, _f14:float, _f15:bytes))";
+		"_f1:bool, _r:record(f11:int, _f12:long), _m:map(string), _c:collection(record(_f13:double, _f14:double, _f15:bytes))";
 	final static String INVALID_STR_STORAGE = "[_r.f12, _f1, _m#{b}]; [_m#{_a}, _r.f11]";
 
 	private static Configuration conf = new Configuration();

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java Fri Apr 16 19:08:58 2010
@@ -53,7 +53,7 @@ import org.junit.Test;
  * 
  */
 public class TestMixedType1 {
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)), m1:map(string),m2:map(map(int)), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:double, f4)), m1:map(string),m2:map(map(int)), c:collection(record(f13:double, f14:double, f15:bytes))";
   final static String STR_STORAGE = "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}]; [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestProjectionOnFullMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestProjectionOnFullMap.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestProjectionOnFullMap.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestProjectionOnFullMap.java Fri Apr 16 19:08:58 2010
@@ -51,7 +51,7 @@ import org.junit.Test;
  * 
  */
 public class TestProjectionOnFullMap {
-  final static String STR_SCHEMA = "f1:string, f2:map";
+  final static String STR_SCHEMA = "f1:string, f2:map(string)";
   final static String STR_STORAGE = "[f1]; [f2]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java Fri Apr 16 19:08:58 2010
@@ -57,7 +57,7 @@ public class TestRecord {
   private static Path path;
   private static FileSystem fs;
   static int count;
-  final static String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(f5, r3:record(f3:float, f4))";
+  final static String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(f5:string, r3:record(f3:double, f4))";
   final static String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
 
   @BeforeClass

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java Fri Apr 16 19:08:58 2010
@@ -60,7 +60,7 @@ public class TestRecord2Map {
   private static Path path;
   private static FileSystem fs;
   static int count;
-  static String STR_SCHEMA = "r1:record(f1:int, f2:map(long)), r2:record(r3:record(f3:float, f4:map(int)))";
+  static String STR_SCHEMA = "r1:record(f1:int, f2:map(long)), r2:record(r3:record(f3:double, f4:map(int)))";
   static String STR_STORAGE = "[r1.f1, r1.f2#{x|y}]; [r2.r3.f4]; [r1.f2, r2.r3.f3, r2.r3.f4#{a|c}]";
 
   @BeforeClass

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java Fri Apr 16 19:08:58 2010
@@ -60,7 +60,7 @@ public class TestRecordMap {
   static private Path path;
   static private FileSystem fs;
   static int count;
-  static String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4:map(int)))";
+  static String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:double, f4:map(int)))";
   static String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3, r2.r3.f4#{a|c}]";
 
   @BeforeClass

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java Fri Apr 16 19:08:58 2010
@@ -109,7 +109,7 @@ public class TestSchema {
 
   @Test
   public void testSimple() throws IOException, ParseException {
-    String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+    String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes";
     String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
 
     // Build Table and column groups
@@ -201,7 +201,7 @@ public class TestSchema {
   @Test
   public void testRecord() throws IOException, ParseException {
     BasicTable.drop(path, conf);
-    String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))";
+    String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:double, f4))";
     String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
 
     path = new Path(getCurrentMethodName());

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java Fri Apr 16 19:08:58 2010
@@ -96,7 +96,7 @@ public class TestSimple {
     tuple.set(0, true); // bool
     tuple.set(1, 1); // int
     tuple.set(2, 1001L); // long
-    tuple.set(3, 1.1); // float
+    tuple.set(3, 1.1f); // float
     tuple.set(4, "hello world 1"); // string
     tuple.set(5, new DataByteArray("hello byte 1")); // byte
 
@@ -108,7 +108,7 @@ public class TestSimple {
     tuple.set(0, false);
     tuple.set(1, 2); // int
     tuple.set(2, 1002L); // long
-    tuple.set(3, 3.1); // float
+    tuple.set(3, 3.1f); // float
     tuple.set(4, "hello world 2"); // string
     tuple.set(5, new DataByteArray("hello byte 2")); // byte
     inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
@@ -142,7 +142,7 @@ public class TestSimple {
     Assert.assertEquals(true, RowValue.get(5));
     Assert.assertEquals(1, RowValue.get(4));
     Assert.assertEquals(1001L, RowValue.get(3));
-    Assert.assertEquals(1.1, RowValue.get(2));
+    Assert.assertEquals(1.1f, RowValue.get(2));
     Assert.assertEquals("hello world 1", RowValue.get(1));
     Assert.assertEquals("hello byte 1", RowValue.get(0).toString());
     scanner.advance();
@@ -152,7 +152,7 @@ public class TestSimple {
     Assert.assertEquals(false, RowValue.get(5));
     Assert.assertEquals(2, RowValue.get(4));
     Assert.assertEquals(1002L, RowValue.get(3));
-    Assert.assertEquals(3.1, RowValue.get(2));
+    Assert.assertEquals(3.1f, RowValue.get(2));
     Assert.assertEquals("hello world 2", RowValue.get(1));
     Assert.assertEquals("hello byte 2", RowValue.get(0).toString());
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java Fri Apr 16 19:08:58 2010
@@ -50,7 +50,7 @@ import org.junit.Test;
  * 
  */
 public class TestSortedBasicTableSplits {
-  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
   // TODO: try map hash split later
   final static String STR_STORAGE = "[r.f12, f1]; [m]";
   private static Configuration conf;

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestTypeCheck.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestTypeCheck.java?rev=935046&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestTypeCheck.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestTypeCheck.java Fri Apr 16 19:08:58 2010
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This is to test type check for writes in Zebra at IO level.
+ */
+public class TestTypeCheck extends BaseTestCase {
+  final static String STR_STORAGE = "[r.f12, f1]; [m]";
+  private static Path path;
+  
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    init();
+  }
+  
+  @Test
+  public void testPositive1() throws IOException, ParseException {
+    path = getTableFullPath("TestTypeCheck");
+    removeDir(path);
+    
+    String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, true);
+
+    Tuple tupRecord;
+    try {
+      tupRecord = TypesUtils.createTuple(schema.getColumnSchema("r")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    tupRecord.set(0, 1);
+    tupRecord.set(1, 1001L);
+    tuple.set(1, tupRecord);
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "x");
+    map.put("b", "y");
+    map.put("c", "z");
+    tuple.set(2, map);
+
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(3).getSchema().getColumn(0).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+  
+  // float column sees double value;
+  @Test (expected = IOException.class)
+  public void testNegative1() throws IOException, ParseException {
+    path = getTableFullPath("TestTypeCheck");
+    removeDir(path);
+    String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:float, f15:bytes))";
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, true);
+
+    Tuple tupRecord;
+    try {
+      tupRecord = TypesUtils.createTuple(schema.getColumnSchema("r")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    tupRecord.set(0, 1);
+    tupRecord.set(1, 1001L);
+    tuple.set(1, tupRecord);
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "x");
+    map.put("b", "y");
+    map.put("c", "z");
+    tuple.set(2, map);
+
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(3).getSchema().getColumn(0).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  // int column sees a string value;
+  @Test (expected = IOException.class)
+  public void testNegative2() throws IOException, ParseException {
+    path = getTableFullPath("TestTypeCheck");
+    removeDir(path);
+
+    String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), m:map(string), c:collection(record(f13:double, f14:double, f15:bytes))";
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, true);
+
+    Tuple tupRecord;
+    try {
+      tupRecord = TypesUtils.createTuple(schema.getColumnSchema("r")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    tupRecord.set(0, "abc");
+    tupRecord.set(1, 1001L);
+    tuple.set(1, tupRecord);
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "x");
+    map.put("b", "y");
+    map.put("c", "z");
+    tuple.set(2, map);
+
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(3).getSchema().getColumn(0).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+     removeDir(path);
+  }
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java Fri Apr 16 19:08:58 2010
@@ -53,7 +53,7 @@ import org.junit.Test;
  * 
  */
 public class TestWrite {
-  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)), m1:map(string),m2:map(map(int)), c:collection(record(f13:double, f14:float, f15:bytes))";
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:double, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:double, f4)), m1:map(string),m2:map(map(int)), c:collection(record(f13:double, f14:double, f15:bytes))";
   final static String STR_STORAGE = "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}]; [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
   private static Configuration conf;
   private static Path path;

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java Fri Apr 16 19:08:58 2010
@@ -300,7 +300,7 @@ public class TestBasicTableIOFormatLocal
     fileSys.delete(outPath, true);
     jobConf.setOutputFormat(BasicTableOutputFormat.class);
     BasicTableOutputFormat.setOutputPath(jobConf, outPath);
-    BasicTableOutputFormat.setSchema(jobConf, "fileName, wordPos, lineNo");
+    BasicTableOutputFormat.setSchema(jobConf, "fileName:string, wordPos:int, lineNo:int");
 
     // set map-only job.
     jobConf.setNumReduceTasks(0);
@@ -596,7 +596,7 @@ public class TestBasicTableIOFormatLocal
     jobConf.setReducerClass(InvertedIndexGen.ReduceClass.class);
     jobConf.setCombinerClass(InvertedIndexGen.CombinerClass.class);
     BasicTableOutputFormat.setOutputPath(jobConf, invIndexTablePath);
-    BasicTableOutputFormat.setSchema(jobConf, "count, index");
+    BasicTableOutputFormat.setSchema(jobConf, "count:int, index:map()");
     jobConf.setNumReduceTasks(options.numReducer);
 
     JobClient.runJob(jobConf);
@@ -948,7 +948,7 @@ public class TestBasicTableIOFormatLocal
     jobConf.setReducerClass(FreqWords.ReduceClass.class);
     jobConf.setCombinerClass(FreqWords.CombinerClass.class);
     BasicTableOutputFormat.setOutputPath(jobConf, freqWordTablePath);
-    BasicTableOutputFormat.setSchema(jobConf, "count");
+    BasicTableOutputFormat.setSchema(jobConf, "count:int");
     jobConf.setNumReduceTasks(1);
 
     JobClient.runJob(jobConf);

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java?rev=935046&r1=935045&r2=935046&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java Fri Apr 16 19:08:58 2010
@@ -287,7 +287,7 @@ public class TestBasicTableIOFormatLocal
 		fileSys.delete(outPath, true);
 		job.setOutputFormatClass(BasicTableOutputFormat.class);
 		BasicTableOutputFormat.setOutputPath(job, outPath);
-		BasicTableOutputFormat.setSchema(job, "fileName, wordPos, lineNo");
+		BasicTableOutputFormat.setSchema(job, "fileName:string, wordPos:int, lineNo:int");
 
 		// set map-only job.
 		job.setNumReduceTasks(0);
@@ -567,7 +567,7 @@ public class TestBasicTableIOFormatLocal
 		job.setReducerClass(InvertedIndexGen.ReduceClass.class);
 		job.setCombinerClass(InvertedIndexGen.CombinerClass.class);
 		BasicTableOutputFormat.setOutputPath(job, invIndexTablePath);
-		BasicTableOutputFormat.setSchema(job, "count, index");
+		BasicTableOutputFormat.setSchema(job, "count:int, index:map()");
 		job.setNumReduceTasks(options.numReducer);
 
 		job.submit();
@@ -906,7 +906,7 @@ public class TestBasicTableIOFormatLocal
 		job.setReducerClass(FreqWords.ReduceClass.class);
 		job.setCombinerClass(FreqWords.CombinerClass.class);
 		BasicTableOutputFormat.setOutputPath(job, freqWordTablePath);
-		BasicTableOutputFormat.setSchema(job, "count");
+		BasicTableOutputFormat.setSchema(job, "count:int");
 		job.setNumReduceTasks(1);
 
 		job.submit();