You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC

svn commit: r803312 [14/16] - in /hadoop/pig/trunk: ./ contrib/zebra/ contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/ contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/ contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/...

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestCollection {
+
+  final static String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes)";
+  final static String STR_STORAGE = "[c]";
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path path;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+
+    System.out.println("ONCE SETUP !! ---------");
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    // path = new Path(pathWorking, this.getClass().getSimpleName());
+    path = fs.getWorkingDirectory();
+    System.out.println("path =" + path);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    /*
+     * conf = new Configuration();
+     * conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+     * conf.setInt("table.input.split.minSize", 64 * 1024);
+     * conf.set("table.output.tfile.compression", "none");
+     * 
+     * RawLocalFileSystem rawLFS = new RawLocalFileSystem(); fs = new
+     * LocalFileSystem(rawLFS); path = new Path(fs.getWorkingDirectory(),
+     * this.getClass() .getSimpleName()); fs = path.getFileSystem(conf); // drop
+     * any previous tables BasicTable.drop(path, conf); BasicTable.Writer writer
+     * = new BasicTable.Writer(path, STR_SCHEMA, STR_STORAGE, false, conf);
+     */
+    /*
+     * /* Schema schema = writer.getSchema(); Tuple tuple =
+     * TypesUtils.createTuple(schema);
+     * 
+     * final int numsBatch = 10; final int numsInserters = 2; TableInserter[]
+     * inserters = new TableInserter[numsInserters]; for (int i = 0; i <
+     * numsInserters; i++) { inserters[i] = writer.getInserter("ins" + i,
+     * false); }
+     * 
+     * for (int b = 0; b < numsBatch; b++) { for (int i = 0; i < numsInserters;
+     * i++) { TypesUtils.resetTuple(tuple);
+     * 
+     * DataBag bagColl = TypesUtils.createBag(); Schema schColl =
+     * schema.getColumn(0).getSchema(); Tuple tupColl1 =
+     * TypesUtils.createTuple(schColl); Tuple tupColl2 =
+     * TypesUtils.createTuple(schColl); byte[] abs1 = new byte[3]; byte[] abs2 =
+     * new byte[4]; tupColl1.set(0, 3.1415926); tupColl1.set(1, 1.6); abs1[0] =
+     * 11; abs1[1] = 12; abs1[2] = 13; tupColl1.set(2, new DataByteArray(abs1));
+     * bagColl.add(tupColl1); tupColl2.set(0, 123.456789); tupColl2.set(1, 100);
+     * abs2[0] = 21; abs2[1] = 22; abs2[2] = 23; abs2[3] = 24; tupColl2.set(2,
+     * new DataByteArray(abs2)); bagColl.add(tupColl2); tuple.set(0, bagColl);
+     * 
+     * inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple); }
+     * } for (int i = 0; i < numsInserters; i++) { inserters[i].close(); }
+     */
+
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(0).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(0, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    bagColl.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(0, bagColl);
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void testRead1() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.pig.table.pig.TableLoader('c');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    int row = 0;
+    int inner = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      row++;
+      if (row == 1) {
+        Iterator<Tuple> bag = ((DataBag) cur.get(0)).iterator();
+        while (bag.hasNext()) {
+          Tuple cur2 = bag.next();
+          inner++;
+          if (inner == 1) {
+            Assert.assertEquals(3.1415926, cur2.get(0));
+            Assert.assertEquals(1.6, cur2.get(1));
+          }
+          if (inner == 2) {
+            Assert.assertEquals(123.456789, cur2.get(0));
+            Assert.assertEquals(100, cur2.get(1));
+          }
+        }// inner while
+      } // if count ==1
+      if (row == 2) {
+        Iterator<Tuple> bag = ((DataBag) cur.get(0)).iterator();
+        while (bag.hasNext()) {
+          Tuple cur2 = bag.next();
+          inner++;
+          if (inner == 1) {
+            Assert.assertEquals(7654.321, cur2.get(0));
+            Assert.assertEquals(0.0001, cur2.get(1));
+            System.out.println("cur : " + cur2.toString());
+            System.out.println("byte : " + cur2.get(2).toString());
+          }
+          if (inner == 2) {
+            Assert.assertEquals(0.123456789, cur2.get(0));
+            Assert.assertEquals(0.3333, cur2.get(1));
+            System.out.println("byte : " + cur2.get(2).toString());
+          }
+        }// inner while
+      }// if count ==2
+      TypesUtils.resetTuple(cur);
+    }
+  }
+
+  @Test
+  // Negative none exist column, using IO layer impl
+  public void testRead2() throws IOException, ParseException {
+    String projection = new String("d");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    // RowValue is an emplty record
+    Assert.assertEquals(false, RowValue.isNull());
+    Assert.assertEquals(null, RowValue.get(0));
+    Assert.assertEquals(1, RowValue.size());
+    reader.close();
+  }
+
+  // Negative none exist column, TODO: failed, throw null pointer
+  public void testReadNeg2() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('d');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    // TODO: verify it returns a tuple with null value
+  }
+
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCollectionTableLoader {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestCollectionTable");
+    // drop any previous tables
+    BasicTable.drop(pathTable, conf);
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "c:collection(a:double, b:float, c:bytes)", "[c]", false, conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+
+        DataBag bagColl = TypesUtils.createBag();
+        Schema schColl = schema.getColumn(0).getSchema();
+        Tuple tupColl1 = TypesUtils.createTuple(schColl);
+        Tuple tupColl2 = TypesUtils.createTuple(schColl);
+        byte[] abs1 = new byte[3];
+        byte[] abs2 = new byte[4];
+        tupColl1.set(0, 3.1415926);
+        tupColl1.set(1, 1.6);
+        abs1[0] = 11;
+        abs1[1] = 12;
+        abs1[2] = 13;
+        tupColl1.set(2, new DataByteArray(abs1));
+        bagColl.add(tupColl1);
+        tupColl2.set(0, 123.456789);
+        tupColl2.set(1, 100);
+        abs2[0] = 21;
+        abs2[1] = 22;
+        abs2[2] = 23;
+        abs2[3] = 24;
+        tupColl2.set(2, new DataByteArray(abs2));
+        bagColl.add(tupColl2);
+        tuple.set(0, bagColl);
+
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('c');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCollectionTableStorer {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestCollectionTable");
+    // drop any previous tables
+    BasicTable.drop(pathTable, conf);
+
+    System.out.println("table path=" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "c:collection(a:double, b:float, c:bytes)", "[c]", false, conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+
+        DataBag bagColl = TypesUtils.createBag();
+        Schema schColl = schema.getColumn(0).getSchema();
+        Tuple tupColl1 = TypesUtils.createTuple(schColl);
+        Tuple tupColl2 = TypesUtils.createTuple(schColl);
+        byte[] abs1 = new byte[3];
+        byte[] abs2 = new byte[4];
+        tupColl1.set(0, 3.1415926);
+        tupColl1.set(1, 1.6);
+        abs1[0] = 11;
+        abs1[1] = 12;
+        abs1[2] = 13;
+        tupColl1.set(2, new DataByteArray(abs1));
+        bagColl.add(tupColl1);
+        tupColl2.set(0, 123.456789);
+        tupColl2.set(1, 100);
+        abs2[0] = 21;
+        abs2[1] = 22;
+        abs2[2] = 23;
+        abs2[3] = 24;
+        tupColl2.set(2, new DataByteArray(abs2));
+        bagColl.add(tupColl2);
+        tuple.set(0, bagColl);
+
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    System.out.println("testStorer");
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+
+    pigServer.store("records", new Path(pathTable, "store").toString(),
+        TableStorer.class.getCanonicalName()
+            + "('c:collection(a:double, b:float, c:bytes)', '[c]')");
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestMapTableLoader {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestMapTableLoader");
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "m1:map(string)", "[m1#{a}]", false, conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "x");
+        map.put("b", "y");
+        map.put("c", "z");
+        tuple.set(0, map);
+
+        try {
+          inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+        } catch (Exception e) {
+          System.out.println(e.getMessage());
+        }
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("m1#{b}");
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    // HashMap<String, Object> mapval;
+    while (!scanner.atEnd()) {
+      scanner.getKey(key);
+      // Assert.assertEquals(key, new BytesWritable("key0".getBytes()));
+      scanner.getValue(value);
+      System.out.println("key = " + key + " value = " + value);
+
+      // mapval = (HashMap<String, Object>) value.get(0);
+      // Assert.assertEquals("x", mapval.get("a"));
+      // Assert.assertEquals(null, mapval.get("b"));
+      // Assert.assertEquals(null, mapval.get("c"));
+      scanner.advance();
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestMapTableStorer {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+
+    pathTable = new Path(pathWorking, "TestMapTableStorer");
+    System.out.println("table path=" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "m:map(string)", "[m#{a}]", false, conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "x");
+        map.put("b", "y");
+        map.put("c", "z");
+        tuple.set(0, map);
+
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    System.out.println("testStorer");
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+
+    pigServer
+        .store("records", new Path(pathTable, "store").toString(),
+            TableStorer.class.getCanonicalName()
+                + "('m:map(string)', '[m#{a|b}]')");
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.record.Record;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestMapType {
+  final static String STR_SCHEMA = "m1:map(string),m2:map(map(int)), m4:map(map(record(f1:int,f2:string)))";
+  final static String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}]; [m4#{a4}]; [m4#{b4|c4}]";
+
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path path;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    System.out.println("ONCE SETUP !! ---------");
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    // path = new Path(pathWorking, this.getClass().getSimpleName());
+    path = fs.getWorkingDirectory();
+    System.out.println("path =" + path);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    Tuple record1;
+    try {
+      record1 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple record2;
+    try {
+      record2 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple record3;
+    try {
+      record3 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    // add data to row 1
+    // m1:map(string)
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put("a", "A");
+    m1.put("b", "B");
+    m1.put("c", "C");
+    tuple.set(0, m1);
+
+    // m2:map(map(int))
+    HashMap<String, Map> m2 = new HashMap<String, Map>();
+    Map<String, Integer> m31 = new HashMap<String, Integer>();
+    m31.put("m311", 311);
+    m31.put("m321", 321);
+    m31.put("m331", 331);
+    Map<String, Integer> m32 = new HashMap<String, Integer>();
+    m32.put("m411", 411);
+    m32.put("m421", 421);
+    m32.put("m431", 431);
+    m2.put("x", m31);
+    m2.put("y", m32);
+    tuple.set(1, m2);
+
+    // m4:map(map(record(f1:int,f2:string)))
+    record1.set(0, 11);
+    record1.set(1, "record row 1.1");
+    Map<String, Tuple> m51 = new HashMap<String, Tuple>();
+    Map<String, Tuple> m52 = new HashMap<String, Tuple>();
+    Map<String, Tuple> m53 = new HashMap<String, Tuple>();
+    m51.put("ma4", (Tuple) record1);
+    m52.put("ma41", (Tuple) record1);
+    m53.put("ma43", (Tuple) record1);
+
+    record2.set(0, 12);
+    record2.set(1, "record row 1.2");
+    m51.put("mb4", (Tuple) record2);
+    m52.put("mb42", (Tuple) record2);
+    m53.put("ma43", (Tuple) record2);
+    System.out.println("record1-1: " + record1.toString());
+
+    record3.set(0, 13);
+    record3.set(1, "record row 1.3");
+    System.out.println("record1-3: " + record1.toString());
+
+    m51.put("mc4", (Tuple) record3);
+    m52.put("mc42", (Tuple) record3);
+    m53.put("ma43", (Tuple) record3);
+
+    Map<String, Map> m4 = new HashMap<String, Map>();
+    m4.put("a4", m51);
+    m4.put("b4", m52);
+    m4.put("c4", m53);
+    m4.put("d4", m53);
+    m4.put("ma43", m53);
+
+    tuple.set(2, m4);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(record1);
+    TypesUtils.resetTuple(record2);
+    TypesUtils.resetTuple(record3);
+    m1.clear();
+    m2.clear();
+    m31.clear();
+    m32.clear();
+    m4.clear();
+    m51.clear();
+    m52.clear();
+    m53.clear();
+    // m1:map(string)
+    m1.put("a", "A2");
+    m1.put("b2", "B2");
+    m1.put("c2", "C2");
+    tuple.set(0, m1);
+
+    // m2:map(map(int))
+    m31.put("m321", 321);
+    m31.put("m322", 322);
+    m31.put("m323", 323);
+    m2.put("z", m31);
+    tuple.set(1, m2);
+
+    // m4:map(map(record(f1:int,f2:string)))
+    record1.set(0, 21);
+    record1.set(1, "record row 2.1");
+    m51.put("ma4", (Tuple) record1);
+    m52.put("ma41", (Tuple) record1);
+    m53.put("ma43", (Tuple) record1);
+
+    record2.set(0, 22);
+    record2.set(1, "record row 2.2");
+    m51.put("mb4", (Tuple) record2);
+    m52.put("mb42", (Tuple) record2);
+    m53.put("ma43", (Tuple) record2);
+
+    record3.set(0, 33);
+    record3.set(1, "record row 3.3");
+    m51.put("mc4", (Tuple) record3);
+    m52.put("mc42", (Tuple) record3);
+    m53.put("ma43", (Tuple) record3);
+
+    m4.put("a4", m51);
+    m4.put("b4", m52);
+
+    m4.put("ma43", m53);
+
+    tuple.set(2, m4);
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // finish building table, closing out the inserter, writer, writer1
+    inserter.close();
+    writer1.finish();
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  // read one map
+  @Test
+  public void testReadSimpleMap() throws IOException, ParseException {
+
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals("{a=A}", RowValue.get(0).toString());
+      }
+      if (row == 2) {
+        Assert.assertEquals("{a=A2}", RowValue.get(0).toString());
+      }
+    }
+  }
+
+  @Test
+  public void testReadMapOfMap() throws IOException, ParseException {
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{b}, m2#{x|z}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b"));
+        Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("x"))
+            .get("m321"));
+        Assert.assertEquals(311, ((Map) ((Map) RowValue.get(1)).get("x"))
+            .get("m311"));
+        Assert.assertEquals(331, ((Map) ((Map) RowValue.get(1)).get("x"))
+            .get("m331"));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x"))
+            .get("m341"));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("z")));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("a")));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("c")));
+      }
+      if (row == 2) {
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b"));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x")));
+        Assert.assertEquals(323, ((Map) ((Map) RowValue.get(1)).get("z"))
+            .get("m323"));
+        Assert.assertEquals(322, ((Map) ((Map) RowValue.get(1)).get("z"))
+            .get("m322"));
+        Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("z"))
+            .get("m321"));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("a")));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("b")));
+        Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("a")));
+
+      }
+    }
+
+  }
+
+  @Test
+  public void testReadMapOfRecord1() throws IOException, ParseException {
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{b}, m4#{a4|c4}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b"));
+        Assert.assertEquals(11, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("ma4")).get(0));
+        Assert.assertEquals("record row 1.1", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("ma4")).get(1));
+        Assert.assertEquals(12, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("mb4")).get(0));
+        Assert.assertEquals("record row 1.2", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("mb4")).get(1));
+        Assert.assertEquals(13, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("mc4")).get(0));
+        Assert.assertEquals("record row 1.3", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("mc4")).get(1));
+
+        Assert.assertEquals(13, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("c4")).get("ma43")).get(0));
+        Assert.assertEquals("record row 1.3", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("c4")).get("ma43")).get(1));
+
+        Assert.assertEquals(null, (((Map) ((Map) RowValue.get(1)).get("c4"))
+            .get("mc4")));
+        Assert.assertEquals(null, (((Map) ((Map) RowValue.get(1)).get("c4"))
+            .get("mb4")));
+
+      }
+      if (row == 2) {
+        Assert.assertEquals(21, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("ma4")).get(0));
+        Assert.assertEquals("record row 2.1", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("ma4")).get(1));
+        Assert.assertEquals(22, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("mb4")).get(0));
+        Assert.assertEquals("record row 2.2", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("mb4")).get(1));
+        Assert.assertEquals(33, ((Tuple) ((Map) ((Map) RowValue.get(1))
+            .get("a4")).get("mc4")).get(0));
+        Assert.assertEquals("record row 3.3", ((Tuple) ((Map) ((Map) RowValue
+            .get(1)).get("a4")).get("mc4")).get(1));
+
+      }
+    }
+
+  }
+
+  @Test
+  // Positive? map object column through pig loader
+  public void testRead4() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1');";
+
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals("A", ((Map) RowValue.get(0)).get("a"));
+        Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b"));
+        Assert.assertEquals("C", ((Map) RowValue.get(0)).get("c"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("non-existent"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b2"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("c2"));
+      }
+      if (row == 2) {
+        Assert.assertEquals("A2", ((Map) RowValue.get(0)).get("a"));
+        Assert.assertEquals("B2", ((Map) RowValue.get(0)).get("b2"));
+        Assert.assertEquals("C2", ((Map) RowValue.get(0)).get("c2"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("non-existent"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b"));
+        Assert.assertEquals(null, ((Map) RowValue.get(0)).get("c"));
+      }
+    }
+    Assert.assertEquals(2, row);
+  }
+
+  @Test
+  // Negative non-exist column through pig loader
+  public void testReadNeg1() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m5');";
+
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals(false, RowValue.isNull());
+        Assert.assertEquals(null, RowValue.get(0));
+        Assert.assertEquals(1, RowValue.size());
+      }
+    }
+    Assert.assertEquals(2, row);
+  }
+
+  @Test
+  // non-existent column name through I/O
+  public void testNeg2() throws IOException, ParseException {
+    String projection = new String("m5");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(false, RowValue.isNull());
+    Assert.assertEquals(null, RowValue.get(0));
+    Assert.assertEquals(1, RowValue.size());
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(false, RowValue.isNull());
+    Assert.assertEquals(null, RowValue.get(0));
+    Assert.assertEquals(1, RowValue.size());
+    reader.close();
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,322 @@
+package org.apache.hadoop.zebra.pig;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestMixedType1 {
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)), m1:map(string),m2:map(map(int)), c:collection(f13:double, f14:float, f15:bytes)";
+  final static String STR_STORAGE = "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}]; [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path path;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    System.out.println("ONCE SETUP !! ---------");
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    // path = new Path(pathWorking, this.getClass().getSimpleName());
+    path = fs.getWorkingDirectory();
+    System.out.println("path =" + path);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord2;
+    try {
+      tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord3;
+    try {
+      tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    // row 1
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 1);
+    tupRecord1.set(1, 1001L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 1.3);
+    tupRecord3.set(1, new DataByteArray("r3 row 1 byte array "));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put("a", "A");
+    m1.put("b", "B");
+    m1.put("c", "C");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    HashMap<String, Map> m2 = new HashMap<String, Map>();
+    Map<String, Integer> m3 = new HashMap<String, Integer>();
+    m3.put("m311", 311);
+    m3.put("m321", 321);
+    m3.put("m331", 331);
+    Map<String, Integer> m4 = new HashMap<String, Integer>();
+    m4.put("m411", 411);
+    m4.put("m421", 421);
+    m4.put("m431", 431);
+    m2.put("x", m3);
+    m2.put("y", m4);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(10).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(tupRecord1);
+    TypesUtils.resetTuple(tupRecord2);
+    TypesUtils.resetTuple(tupRecord3);
+    m1.clear();
+    m2.clear();
+    m3.clear();
+    m4.clear();
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 2);
+    tupRecord1.set(1, 1002L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 2.3);
+    tupRecord3.set(1, new DataByteArray("r3 row2  byte array"));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    m1.put("a2", "A2");
+    m1.put("b2", "B2");
+    m1.put("c2", "C2");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    m3.put("m321", 321);
+    m3.put("m322", 322);
+    m3.put("m323", 323);
+    m2.put("z", m3);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    bagColl.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('r1.f2, s1');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals(1001L, RowValue.get(0));
+        Assert.assertEquals(true, RowValue.get(1));
+      }
+      if (row == 2) {
+        Assert.assertEquals(1002L, RowValue.get(0));
+        Assert.assertEquals(false, RowValue.get(1));
+      }
+    }
+  }
+
+  @Test
+  public void testStitch() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s1, r1');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Tuple recordTuple = (Tuple) RowValue.get(1);
+        Assert.assertEquals(1, recordTuple.get(0));
+        Assert.assertEquals(1001L, recordTuple.get(1));
+        Assert.assertEquals(true, RowValue.get(0));
+      }
+
+    }
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestRealCluster.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestRealCluster {
+
+  public static void main(String[] args) throws IOException, ExecException,
+      Exception {
+
+    Configuration conf = new Configuration();
+    PigServer pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+        .toProperties(conf));
+    String pp = "/user/harmeek/outputdata1/t1";
+    pigServer
+        .registerJar("/homes/harmeek/july14_investigation/pig-zebra-jul14.jar");
+    String query = "records = LOAD '" + pp.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+
+    pigServer.shutdown();
+    /*
+     * Use pig STORE to store testing data
+     */
+    /*
+     * pigServer.store("records", new Path(pathTable, "store").toString(),
+     * TableStorer.class.getCanonicalName() +
+     * "('SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g', '[SF_a, SF_b, SF_c]; [SF_e]')" );
+     */
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestSimpleType {
+
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path path;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    System.out.println("ONCE SETUP !! ---------");
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    // path = new Path(pathWorking, this.getClass().getSimpleName());
+    path = fs.getWorkingDirectory();
+    System.out.println("path =" + path);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    Schema schema = writer.getSchema();
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+
+    Tuple tuple = TypesUtils.createTuple(schema);
+    TypesUtils.resetTuple(tuple);
+
+    // insert data in row 1
+    int row = 0;
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    row++;
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  // @Test
+  public void testReadSimpleStitch() throws IOException, ParseException {
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s5,s1');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        Assert.assertEquals("hello world 1", RowValue.get(0));
+        Assert.assertEquals(true, RowValue.get(1));
+      }
+      if (row == 2) {
+        Assert.assertEquals("hello world 2", RowValue.get(0));
+        Assert.assertEquals(false, RowValue.get(1));
+      }
+    }
+  }
+
+  // @Test
+  // Test reader
+  public void testReadSimple1() throws IOException, ParseException {
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s6,s5,s4,s3,s2,s1');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    BytesWritable key = new BytesWritable();
+    int row = 0;
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      row++;
+      if (row == 1) {
+        // Assert.assertEquals(key, new
+        // BytesWritable("k11".getBytes()));
+        Assert.assertEquals(true, RowValue.get(5));
+        Assert.assertEquals(1, RowValue.get(4));
+        Assert.assertEquals(1001L, RowValue.get(3));
+        Assert.assertEquals(1.1, RowValue.get(2));
+        Assert.assertEquals("hello world 1", RowValue.get(1));
+        Assert.assertEquals("hello byte 1", RowValue.get(0).toString());
+      }
+      if (row == 2) {
+        Assert.assertEquals(false, RowValue.get(5));
+        Assert.assertEquals(2, RowValue.get(4));
+        Assert.assertEquals(1002L, RowValue.get(3));
+        Assert.assertEquals(3.1, RowValue.get(2));
+        Assert.assertEquals("hello world 2", RowValue.get(1));
+        Assert.assertEquals("hello byte 2", RowValue.get(0).toString());
+      }
+    }
+  }
+
+  // @Test
+  // Test reader, negative. not exist field in the projection
+  public void testRead2() throws IOException, ParseException {
+    try {
+      String query = "records = LOAD '" + path.toString()
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader('s7');";
+      Assert.fail("Project should not take non-existent fields");
+    } catch (Exception e) {
+      System.out.println(e);
+    }
+
+  }
+
+  @Test
+  // Store same table
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+    }
+
+    /*
+     * Use pig STORE to store testing data
+     */
+    Path newPath = new Path(getCurrentMethodName());
+    pigServer
+        .store(
+            "records",
+            new Path(newPath, "store").toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+
+  }
+
+  @Test
+  // store different records, second row of the previous table
+  public void testStorer2() throws ExecException, IOException {
+    // Load original table
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+
+    // filter the original table
+    String query2 = "newRecord = FILTER records BY (s2 >= 2);";
+    pigServer.registerQuery(query2);
+
+    // store the new records to new table
+    Path newPath = new Path(getCurrentMethodName());
+    pigServer
+        .store(
+            "newRecord",
+            newPath.toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+
+    // check new table content
+    String query3 = "newRecords = LOAD '"
+        + newPath.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s6,s5,s4,s3,s2,s1');";
+    System.out.println(query3);
+    // newRecords = LOAD
+    // 'org.apache.hadoop.zebra.pig.TestSimpleType.testStorer2' USING
+    // org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);
+    pigServer.registerQuery(query3);
+
+    Iterator<Tuple> it3 = pigServer.openIterator("newRecords");
+    // BytesWritable key2 = new BytesWritable();
+    int row = 0;
+    Tuple RowValue2 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue2 = it3.next();
+      row++;
+      if (row == 1) {
+        Assert.assertEquals(false, RowValue2.get(5));
+        Assert.assertEquals(2, RowValue2.get(4));
+        Assert.assertEquals(1002L, RowValue2.get(3));
+        Assert.assertEquals(3.1, RowValue2.get(2));
+        Assert.assertEquals("hello world 2", RowValue2.get(1));
+        Assert.assertEquals("hello byte 2", RowValue2.get(0).toString());
+      }
+    }
+    Assert.assertEquals(1, row);
+  }
+
+  @Test
+  // store different records, with storage hint is empty
+  public void testStorer3() throws ExecException, IOException {
+    // Load original table
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+
+    // filter the original table
+    String query2 = "newRecord = FILTER records BY (s2 >= 2);";
+    pigServer.registerQuery(query2);
+
+    // store the new records to new table
+    Path newPath = new Path(getCurrentMethodName());
+    pigServer.store("newRecord", newPath.toString(), TableStorer.class
+        .getCanonicalName()
+        + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '')");
+
+    // check new table content
+    String query3 = "newRecords = LOAD '"
+        + newPath.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s6,s5,s4,s3,s2,s1');";
+    System.out.println(query3);
+    // newRecords = LOAD
+    // 'org.apache.hadoop.zebra.pig.TestSimpleType.testStorer2' USING
+    // org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);
+    pigServer.registerQuery(query3);
+
+    Iterator<Tuple> it3 = pigServer.openIterator("newRecords");
+    // BytesWritable key2 = new BytesWritable();
+    int row = 0;
+    Tuple RowValue2 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue2 = it3.next();
+      row++;
+      if (row == 1) {
+        Assert.assertEquals(false, RowValue2.get(5));
+        Assert.assertEquals(2, RowValue2.get(4));
+        Assert.assertEquals(1002L, RowValue2.get(3));
+        Assert.assertEquals(3.1, RowValue2.get(2));
+        Assert.assertEquals("hello world 2", RowValue2.get(1));
+        Assert.assertEquals("hello byte 2", RowValue2.get(0).toString());
+      }
+    }
+    Assert.assertEquals(1, row);
+  }
+
+  @Test
+  // store different records, with column group is empty
+  public void testStorer4() throws ExecException, IOException {
+    // Load original table
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+
+    // filter the original table
+    String query2 = "newRecord = FILTER records BY (s2 >= 2);";
+    pigServer.registerQuery(query2);
+
+    // store the new records to new table
+    Path newPath = new Path(getCurrentMethodName());
+    pigServer.store("newRecord", newPath.toString(), TableStorer.class
+        .getCanonicalName()
+        + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[]')");
+
+    // check new table content
+    String query3 = "newRecords = LOAD '"
+        + newPath.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('s6,s5,s4,s3,s2,s1');";
+    System.out.println(query3);
+    // newRecords = LOAD
+    // 'org.apache.hadoop.zebra.pig.TestSimpleType.testStorer2' USING
+    // org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);
+    pigServer.registerQuery(query3);
+
+    Iterator<Tuple> it3 = pigServer.openIterator("newRecords");
+    // BytesWritable key2 = new BytesWritable();
+    int row = 0;
+    Tuple RowValue2 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue2 = it3.next();
+      row++;
+      if (row == 1) {
+        Assert.assertEquals(false, RowValue2.get(5));
+        Assert.assertEquals(2, RowValue2.get(4));
+        Assert.assertEquals(1002L, RowValue2.get(3));
+        Assert.assertEquals(3.1, RowValue2.get(2));
+        Assert.assertEquals("hello world 2", RowValue2.get(1));
+        Assert.assertEquals("hello byte 2", RowValue2.get(0).toString());
+      }
+    }
+    Assert.assertEquals(1, row);
+  }
+
+  @Test
+  // negative, schema description is different from input tuple, less column
+  // numbers
+  public void testStorerNegative1() throws ExecException, IOException {
+
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+    }
+
+    Path newPath = new Path(getCurrentMethodName());
+    ExecJob pigJob = pigServer
+        .store(
+            "records",
+            new Path(newPath, "store").toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+    Assert.assertNotNull(pigJob.getException());
+    System.out.println(pigJob.getException());
+  }
+
+  @Test
+  // negative, storage hint duplicate the columns
+  public void testStorerNegative2() throws ExecException, IOException {
+
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+    }
+
+    Path newPath = new Path(getCurrentMethodName());
+    
+    ExecJob pigJob = pigServer
+          .store(
+              "records",
+              new Path(newPath, "store").toString(),
+              TableStorer.class.getCanonicalName()
+                  + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s1, s4]')");
+      Assert.assertNotNull(pigJob.getException());
+      System.out.println(pigJob.getException());
+  }
+
+  @Test
+  // negative, storage hint duplicate the column groups
+  public void testStorerNegative3() throws ExecException, IOException {
+
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+    }
+
+    Path newPath = new Path(getCurrentMethodName());
+
+    ExecJob pigJob = pigServer
+        .store(
+            "records",
+            new Path(newPath, "store").toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1]; [s1]')");
+    Assert.assertNotNull(pigJob.getException());
+    System.out.println(pigJob.getException());
+  }
+
+  // @Test
+  // negative, schema description is different from input tuple, different
+  // data types for columns
+  public void testStorerNegative4() throws ExecException, IOException {
+
+    String query = "records = LOAD '" + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+    }
+
+    Path newPath = new Path(getCurrentMethodName());
+    ExecJob pigJob = pigServer
+        .store(
+            "records",
+            new Path(newPath, "store").toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s1:int, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+    Assert.assertNotNull(pigJob.getException());
+    System.out.println(pigJob.getException());
+  }
+
+  @Test
+  // Store negative, store to same path. Store should fail
+  public void testStorer5() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '"
+        + path.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader() as (s1,s2,s3,s4,s5,s6);";
+    pigServer.registerQuery(query);
+
+    /*
+     * Use pig STORE to store testing data
+     */
+    
+    ExecJob pigJob = pigServer
+        .store(
+            "records",
+            path.toString(),
+            TableStorer.class.getCanonicalName()
+                + "('s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes', '[s1, s2]; [s3, s4]')");
+    Assert.assertNotNull(pigJob.getException());
+    System.out.println(pigJob.getException());
+  }
+
+}