You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC

svn commit: r803312 [9/16] - in /hadoop/pig/trunk: ./ contrib/zebra/ contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/ contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/ contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/s...

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,1021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestCollection {
+
+  final static String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  final static String STR_STORAGE = "[c]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestCollection");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+    DataBag bag1 = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(0).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+
+    DataBag bag2 = TypesUtils.createBag();
+    Schema schColl2 = schema.getColumn(1).getSchema();
+    Tuple tupColl2_1 = TypesUtils.createTuple(schColl2);
+    Tuple tupColl2_2 = TypesUtils.createTuple(schColl2);
+    Tuple collRecord1;
+    try {
+      collRecord1 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Tuple collRecord2;
+    try {
+      collRecord2 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    // c3:collection(c3_1:collection(e:int,f:bool))
+    DataBag bag3 = TypesUtils.createBag();
+    Schema schColl3 = schema.getColumn(2).getSchema();
+    DataBag bag3_1 = TypesUtils.createBag();
+    DataBag bag3_2 = TypesUtils.createBag();
+
+    Tuple tupColl3_1 = null;
+    try {
+      tupColl3_1 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Tuple tupColl3_2;
+    try {
+      tupColl3_2 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupColl3_3 = null;
+    try {
+      tupColl3_3 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Tuple tupColl3_4;
+    try {
+      tupColl3_4 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bag1.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bag1.add(tupColl2);
+    tuple.set(0, bag1);
+
+    collRecord1.set(0, 1);
+    collRecord1.set(1, "record1_string1");
+    tupColl2_1.set(0, collRecord1);
+    tupColl2_1.set(1, "hello1");
+    bag2.add(tupColl2_1);
+
+    collRecord2.set(0, 2);
+    collRecord2.set(1, "record2_string1");
+    tupColl2_2.set(0, collRecord2);
+    tupColl2_2.set(1, "hello2");
+    bag2.add(tupColl2_2);
+    tuple.set(1, bag2);
+
+    TypesUtils.resetTuple(tupColl3_1);
+    TypesUtils.resetTuple(tupColl3_2);
+    tupColl3_1.set(0, 1);
+    tupColl3_1.set(1, true);
+    tupColl3_2.set(0, 2);
+    tupColl3_2.set(1, false);
+    bag3_1.add(tupColl3_1);
+    bag3_1.add(tupColl3_2);
+    bag3.addAll(bag3_1);
+
+    tupColl3_3.set(0, 3);
+    tupColl3_3.set(1, true);
+    tupColl3_4.set(0, 4);
+    tupColl3_4.set(1, false);
+    bag3_2.add(tupColl3_3);
+    bag3_2.add(tupColl3_4);
+    bag3.addAll(bag3_2);
+    tuple.set(2, bag3);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    row++;
+
+    bag1.clear();
+    bag2.clear();
+    bag3.clear();
+    bag3_1.clear();
+    bag3_2.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    TypesUtils.resetTuple(tupColl2_1);
+    TypesUtils.resetTuple(tupColl2_2);
+    TypesUtils.resetTuple(collRecord1);
+    TypesUtils.resetTuple(collRecord2);
+    TypesUtils.resetTuple(tupColl3_1);
+    TypesUtils.resetTuple(tupColl3_2);
+    TypesUtils.resetTuple(tupColl3_3);
+    TypesUtils.resetTuple(tupColl3_4);
+
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bag1.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bag1.add(tupColl2);
+    tuple.set(0, bag1);
+
+    collRecord1.set(0, 3);
+    collRecord1.set(1, "record1_string2");
+    tupColl2_1.set(0, collRecord1);
+    tupColl2_1.set(1, "hello1_2");
+    bag2.add(tupColl2_1);
+
+    collRecord2.set(0, 4);
+    collRecord2.set(1, "record2_string2");
+    tupColl2_2.set(0, collRecord2);
+    tupColl2_2.set(1, "hello2_2");
+    bag2.add(tupColl2_2);
+    tuple.set(1, bag2);
+
+    tupColl3_1.set(0, 5);
+    tupColl3_1.set(1, true);
+    tupColl3_2.set(0, 6);
+    tupColl3_2.set(1, false);
+    bag3_1.add(tupColl3_1);
+    bag3_1.add(tupColl3_2);
+    bag3.addAll(bag3_1);
+
+    tupColl3_3.set(0, 7);
+    tupColl3_3.set(1, true);
+    tupColl3_4.set(0, 8);
+    tupColl3_4.set(1, false);
+    bag3_2.add(tupColl3_3);
+    bag3_2.add(tupColl3_4);
+    bag3.addAll(bag3_2);
+    tuple.set(2, bag3);
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  // read one collection
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  @Test
+  public void testRead1() throws IOException, ParseException {
+    String projection = new String("c");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("test read 1: row: " + RowValue.toString());
+    // test read 1: row: ({(3.1415926,1.6, ),(123.456789,100,)})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(3.1415926, cur.get(0));
+        Assert.assertEquals(1.6, cur.get(1));
+        System.out
+            .println("byte 0: " + ((DataByteArray) cur.get(2)).toString());
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(123.456789, cur.get(0));
+        Assert.assertEquals(100, cur.get(1));
+        // Assert.assertEquals(3.1415926, cur.get(2));
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(7654.321, cur.get(0));
+        Assert.assertEquals(0.0001, cur.get(1));
+        // Assert.assertEquals(3.1415926, cur.get(2));
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(0.123456789, cur.get(0));
+        Assert.assertEquals(0.3333, cur.get(1));
+        // Assert.assertEquals(3.1415926, cur.get(2));
+      }
+    }
+
+    reader.close();
+  }
+
+  // read second collection
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  @Test
+  public void testRead2() throws IOException, ParseException {
+    String projection = new String("c2");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("test read 2:row: " + RowValue.toString());
+    // test read 2:row:
+    // ({((1,record1_string1),hello1),((2,record2_string1),hello2)})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0)); // (1,record1_string1)
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+        Assert.assertEquals("hello1", cur.get(1));
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+        Assert.assertEquals("hello2", cur.get(1));
+
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+        Assert.assertEquals("hello1_2", cur.get(1));
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+        Assert.assertEquals("hello2_2", cur.get(1));
+      }
+    }
+
+    reader.close();
+  }
+
+  // read 3rd column
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  @Test
+  public void testRead3() throws IOException, ParseException {
+    String projection = new String("c3");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("test record 3: row: " + RowValue.toString());
+    // test record 3: row: ({(1,true),(2,false),(3,true),(4,false)})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0)); // 3
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, cur.get(0));
+        Assert.assertEquals(true, cur.get(1));
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, cur.get(0));
+        Assert.assertEquals(false, cur.get(1));
+      }
+      if (list == 3) {
+        Assert.assertEquals(3, cur.get(0));
+        Assert.assertEquals(true, cur.get(1));
+      }
+      if (list == 4) {
+        Assert.assertEquals(4, cur.get(0));
+        Assert.assertEquals(false, cur.get(1));
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    System.out.println("row: " + RowValue.toString());
+    // row: ({(5,true),(6,false),(7,true),(8,false)})
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(5, cur.get(0));
+        Assert.assertEquals(true, cur.get(1));
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(6, cur.get(0));
+        Assert.assertEquals(false, cur.get(1));
+      }
+      if (list2 == 3) {
+        Assert.assertEquals(7, cur.get(0));
+        Assert.assertEquals(true, cur.get(1));
+      }
+      if (list2 == 4) {
+        Assert.assertEquals(8, cur.get(0));
+        Assert.assertEquals(false, cur.get(1));
+      }
+    }
+
+    reader.close();
+  }
+
+  // Negative none exist column
+  @Test
+  public void xtestReadNeg1() throws IOException, ParseException {
+    String projection = new String("d");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("row: " + RowValue.toString());
+    System.out.println("row1: " + RowValue.get(0));
+    Assert.assertEquals(false, RowValue.isNull());
+    Assert.assertEquals(null, RowValue.get(0));
+    Assert.assertEquals(1, RowValue.size());
+    reader.close();
+  }
+
+  // read, should support project to 2nd level
+  @Test
+  public void testRead5() throws IOException, ParseException {
+    String projection = new String("c.a");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("test read 5: row: " + RowValue.toString());
+    // test read 5: row: ({(3.1415926),(123.456789)})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(3.1415926, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(123.456789, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(7654.321, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(0.123456789, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    reader.close();
+  }
+
+  // read, should support project to 2nd level
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  @Test
+  public void testRead6() throws IOException, ParseException {
+    String projection = new String("c2.r1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("test read 6 :row: " + RowValue.toString());
+    // test read 6 :row: ({((1,record1_string1)),((2,record2_string1))})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    reader.close();
+  }
+
+  // read, should support project to 3rd level TODO: construct scanner failed
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+
+  public void xtestRead7() throws IOException, ParseException {
+    String projection = new String("c2.r1.f1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    scanner = reader.getScanner(splits.get(0), true);
+
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("row: " + RowValue.toString());
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    reader.close();
+  }
+
+  // read, should support project to 3rd level TODO: construct scanner failed
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+
+  public void xtestRead8() throws IOException, ParseException {
+    String projection = new String("c3.c3_1.e");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = null;
+    scanner = reader.getScanner(splits.get(0), true);
+
+    scanner = reader.getScanner(splits.get(0), true);
+
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("row: " + RowValue.toString());
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+    int list2 = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list2++;
+      if (list2 == 1) {
+        Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list2 == 2) {
+        Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    reader.close();
+  }
+
+  // read stitch simple + record stitch
+  // final static String STR_SCHEMA =
+  // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+  @Test
+  public void testRead9() throws IOException, ParseException {
+    String projection = new String("c.a, c2.r1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println("read 9: " + RowValue.toString());
+    // read 9:
+    // ({(3.1415926),(123.456789)},{((1,record1_string1)),((2,record2_string1))})
+    Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+    int list = 0;
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(3.1415926, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list == 2) {
+        Assert.assertEquals(123.456789, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    Iterator<Tuple> it2 = ((DataBag) RowValue.get(1)).iterator();
+    list = 0;
+    while (it2.hasNext()) {
+      Tuple cur = it2.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          ((Tuple) cur.get(0)).get(2);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list == 2) {
+        Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+        try {
+          ((Tuple) cur.get(0)).get(2);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+    scanner.advance();
+    scanner.getValue(RowValue);
+    Iterator<Tuple> it3 = ((DataBag) RowValue.get(0)).iterator();
+    while (it3.hasNext()) {
+      Tuple cur = it3.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(7654.321, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list == 2) {
+        Assert.assertEquals(0.123456789, cur.get(0));
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    Iterator<Tuple> it4 = ((DataBag) RowValue.get(1)).iterator();
+    list = 0;
+    while (it4.hasNext()) {
+      Tuple cur = it4.next();
+      System.out.println(cur.get(0));
+      list++;
+      if (list == 1) {
+        Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          ((Tuple) cur.get(0)).get(2);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+      if (list == 2) {
+        Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+        Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+        try {
+          ((Tuple) cur.get(0)).get(2);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+        try {
+          cur.get(1);
+          Assert.fail("Should throw index out of bounds exception");
+        } catch (Exception e) {
+          System.out.println(e);
+        }
+      }
+    }
+
+    reader.close();
+  }
+
+  // Negative should not support 2nd level collection split
+  @Test
+  public void testSplit1() throws IOException, ParseException {
+    String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+    String STR_STORAGE = "[c.a]";
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), this.getClass().getSimpleName());
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+    try {
+      BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+          STR_STORAGE, false, conf);
+      Assert.fail("should throw exception");
+    } catch (Exception e) {
+      System.out.println(e);
+    }
+  }
+
+  // Negative should not support none_existent column split
+  @Test
+  public void testSplit2() throws IOException, ParseException {
+    String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+    String STR_STORAGE = "[d]";
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), this.getClass().getSimpleName());
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+    try {
+      BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+          STR_STORAGE, false, conf);
+      Assert.fail("should throw exception");
+    } catch (Exception e) {
+      System.out.println(e);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing ColumnGroup APIs called as if in MapReduce Jobs
+ */
+public class TestColumnGroup {
+  static Configuration conf;
+  static Random random;
+  static Path rootPath;
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+    random = new Random(System.nanoTime());
+    rootPath = new Path(System.getProperty("test.build.data",
+        "build/test/data/work-dir"));
+    fs = rootPath.getFileSystem(conf);
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  BytesWritable makeRandomKey(int max) {
+    return makeKey(random.nextInt(max));
+  }
+
+  static BytesWritable makeKey(int i) {
+    return new BytesWritable(String.format("key%09d", i).getBytes());
+  }
+
+  String makeString(String prefix, int max) {
+    return String.format("%s%09d", prefix, random.nextInt(max));
+  }
+
+  int createCG(int parts, int rows, String strSchema, Path path,
+      boolean properClose, boolean sorted, int[] emptyTFiles)
+      throws IOException, ParseException {
+    if (fs.exists(path)) {
+      ColumnGroup.drop(path, conf);
+    }
+
+    Set<Integer> emptyTFileSet = new HashSet<Integer>();
+    if (emptyTFiles != null) {
+      for (int i = 0; i < emptyTFiles.length; ++i) {
+        emptyTFileSet.add(emptyTFiles[i]);
+      }
+    }
+
+    ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, sorted,
+        "pig", "lzo2", false, conf);
+    writer.finish();
+
+    int total = 0;
+    Schema schema = new Schema(strSchema);
+    String colNames[] = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    int[] permutation = new int[parts];
+    for (int i = 0; i < parts; ++i) {
+      permutation[i] = i;
+    }
+
+    for (int i = parts - 1; i > 0; --i) {
+      int targetIndex = random.nextInt(i + 1);
+      int tmp = permutation[i];
+      permutation[i] = permutation[targetIndex];
+      permutation[targetIndex] = tmp;
+    }
+
+    for (int i = 0; i < parts; ++i) {
+      writer = new ColumnGroup.Writer(path, conf);
+      TableInserter inserter = writer.getInserter(String.format("part-%06d",
+          permutation[i]), true);
+      if ((rows > 0) && !emptyTFileSet.contains(permutation[i])) {
+        int actualRows = random.nextInt(rows) + rows / 2;
+        for (int j = 0; j < actualRows; ++j, ++total) {
+          BytesWritable key;
+          if (!sorted) {
+            key = makeRandomKey(rows * 10);
+          } else {
+            key = makeKey(total);
+          }
+          TypesUtils.resetTuple(tuple);
+          for (int k = 0; k < tuple.size(); ++k) {
+            try {
+              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+            } catch (ExecException e) {
+              e.printStackTrace();
+            }
+          }
+          inserter.insert(key, tuple);
+        }
+      }
+      inserter.close();
+    }
+
+    if (properClose) {
+      writer = new ColumnGroup.Writer(path, conf);
+      writer.close();
+      BasicTableStatus status = getStatus(path);
+      Assert.assertEquals(total, status.getRows());
+    }
+
+    return total;
+  }
+
+  static class DupKeyGen {
+    int low, high;
+    int current;
+    boolean grow = true;
+    int index = 0;
+    int count = 0;
+
+    DupKeyGen(int low, int high) {
+      this.low = Math.max(10, low);
+      this.high = Math.max(this.low * 2, high);
+      current = this.low;
+    }
+
+    BytesWritable next() {
+      if (count == 0) {
+        count = nextCount();
+        ++index;
+      }
+      --count;
+      return makeKey(index);
+    }
+
+    int nextCount() {
+      int ret = current;
+      if ((grow && current > high) || (!grow && current < low)) {
+        grow = !grow;
+      }
+      if (grow) {
+        current *= 2;
+      } else {
+        current /= 2;
+      }
+      return ret;
+    }
+  }
+
+  int createCGDupKeys(int parts, int rows, String strSchema, Path path)
+      throws IOException, ParseException {
+    if (fs.exists(path)) {
+      ColumnGroup.drop(path, conf);
+    }
+
+    ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, true,
+        "pig", "lzo2", false, conf);
+    writer.finish();
+
+    int total = 0;
+    DupKeyGen keyGen = new DupKeyGen(10, rows * 3);
+    Schema schema = new Schema(strSchema);
+    String colNames[] = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    int[] permutation = new int[parts];
+    for (int i = 0; i < parts; ++i) {
+      permutation[i] = i;
+    }
+
+    for (int i = parts - 1; i > 0; --i) {
+      int targetIndex = random.nextInt(i + 1);
+      int tmp = permutation[i];
+      permutation[i] = permutation[targetIndex];
+      permutation[targetIndex] = tmp;
+    }
+
+    for (int i = 0; i < parts; ++i) {
+      writer = new ColumnGroup.Writer(path, conf);
+      TableInserter inserter = writer.getInserter(String.format("part-%06d",
+          permutation[i]), true);
+      if (rows > 0) {
+        int actualRows = random.nextInt(rows * 2 / 3) + rows * 2 / 3;
+        for (int j = 0; j < actualRows; ++j, ++total) {
+          BytesWritable key = keyGen.next();
+          TypesUtils.resetTuple(tuple);
+          for (int k = 0; k < tuple.size(); ++k) {
+            try {
+              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+            } catch (ExecException e) {
+              e.printStackTrace();
+            }
+          }
+          inserter.insert(key, tuple);
+        }
+      }
+      inserter.close();
+    }
+
+    writer = new ColumnGroup.Writer(path, conf);
+    writer.close();
+    BasicTableStatus status = getStatus(path);
+    Assert.assertEquals(total, status.getRows());
+
+    return total;
+  }
+
+  void rangeSplitCG(int numSplits, int totalRows, String strProjection,
+      Path path) throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+
+    List<CGRangeSplit> splits = reader.rangeSplit(numSplits);
+    reader.close();
+    int total = 0;
+    for (int i = 0; i < splits.size(); ++i) {
+      reader = new ColumnGroup.Reader(path, conf);
+      reader.setProjection(strProjection);
+      total += doReadOnly(reader.getScanner(splits.get(i), true));
+      totalBytes -= reader.getBlockDistribution(splits.get(i)).getLength();
+    }
+    Assert.assertEquals(total, totalRows);
+    Assert.assertEquals(totalBytes, 0L);
+  }
+
+  void doRangeSplit(int[] numSplits, int totalRows, String projection, Path path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        rangeSplitCG(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  void keySplitCG(int numSplits, int totalRows, String strProjection, Path path)
+      throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
+    Assert.assertEquals(totalBytes, keyDistri.length());
+    reader.close();
+    BytesWritable[] keys = null;
+    if (keyDistri.size() >= numSplits) {
+      keyDistri.resize(numSplits);
+      Assert.assertEquals(totalBytes, keyDistri.length());
+      RawComparable[] rawComparables = keyDistri.getKeys();
+      keys = new BytesWritable[rawComparables.length];
+      for (int i = 0; i < keys.length; ++i) {
+        keys[i] = new BytesWritable();
+        keys[i].setSize(rawComparables[i].size());
+        System.arraycopy(rawComparables[i].buffer(),
+            rawComparables[i].offset(), keys[i].get(), 0, rawComparables[i]
+                .size());
+      }
+    } else {
+      int targetSize = Math.min(totalRows / 10, numSplits);
+      // revert to manually cooked up keys.
+      Set<Integer> keySets = new TreeSet<Integer>();
+      while (keySets.size() < targetSize) {
+        keySets.add(random.nextInt(totalRows));
+      }
+      keys = new BytesWritable[targetSize];
+      if (!keySets.isEmpty()) {
+        int j = 0;
+        for (int i : keySets.toArray(new Integer[keySets.size()])) {
+          keys[j] = makeKey(i);
+          ++j;
+        }
+      }
+    }
+
+    int total = 0;
+    for (int i = 0; i < keys.length; ++i) {
+      reader = new ColumnGroup.Reader(path, conf);
+      reader.setProjection(strProjection);
+      BytesWritable begin = (i == 0) ? null : keys[i - 1];
+      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+      total += doReadOnly(reader.getScanner(begin, end, true));
+    }
+    Assert.assertEquals(total, totalRows);
+  }
+
+  void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        keySplitCG(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  BasicTableStatus getStatus(Path path) throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    try {
+      return reader.getStatus();
+    } finally {
+      reader.close();
+    }
+  }
+
+  void doReadWrite(Path path, int parts, int rows, String schema,
+      String projection, boolean properClose, boolean sorted, int[] emptyTFiles)
+      throws IOException, ParseException {
+    int totalRows = createCG(parts, rows, schema, path, properClose, sorted,
+        emptyTFiles);
+    if (rows == 0) {
+      Assert.assertEquals(rows, 0);
+    }
+
+    doRangeSplit(new int[] { 1, 2, parts / 2, parts, 2 * parts }, totalRows,
+        projection, path);
+    if (sorted) {
+      doKeySplit(new int[] { 1, 2, parts / 2, parts, 2 * parts, 10 * parts },
+          totalRows, projection, path);
+    }
+  }
+
+  int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+    int total = 0;
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    for (; !scanner.atEnd(); scanner.advance()) {
+      ++total;
+      switch (random.nextInt() % 4) {
+      case 0:
+        scanner.getKey(key);
+        break;
+      case 1:
+        scanner.getValue(value);
+        break;
+      case 2:
+        scanner.getKey(key);
+        scanner.getValue(value);
+        break;
+      default: // no-op.
+      }
+    }
+    scanner.close();
+
+    return total;
+  }
+
+  @Test
+  public void testNullSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupNullSplits");
+    int totalRows = createCG(2, 10, "a, b, c", path, true, true, null);
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("a,d,c,f");
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, null,
+        false)));
+    reader.close();
+  }
+
+  @Test
+  public void testNegativeSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestNegativeSplits");
+    int totalRows = createCG(2, 100, "a, b, c", path, true, true, null);
+    rangeSplitCG(-1, totalRows, "a,d,c,f", path);
+  }
+
+  @Test
+  public void testEmptyCG() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
+  }
+
+  @Test
+  public void testEmptyTFiles() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);    
+  }
+
+  public void testNormalCases() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupNormal");
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
+  }
+
+  @Test
+  public void testSomeEmptyTFiles() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
+		for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
+          emptyTFiles);
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
+          emptyTFiles);
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
+          emptyTFiles);    
+    }
+  }
+
+  int countRows(Path path, String projection) throws IOException,
+      ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    if (projection != null) {
+      reader.setProjection(projection);
+    }
+    int totalRows = 0;
+    TableScanner scanner = reader.getScanner(null, true);
+    for (; !scanner.atEnd(); scanner.advance()) {
+      ++totalRows;
+    }
+    scanner.close();
+    return totalRows;
+  }
+
+  @Test
+  public void testProjection() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupProjection");
+    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    Assert.assertEquals(totalRows, countRows(path, null));
+    Assert.assertEquals(totalRows, countRows(path, ""));
+  }
+
+  @Test
+  public void testDuplicateKeys() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupDuplicateKeys");
+    int totalRows = createCGDupKeys(2, 250, "a, b, c", path);
+    doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+        path);
+  }
+
+  @Test
+  public void testSortedCGKeySplit() throws IOException, ParseException {
+    conf.setInt("table.output.tfile.minBlock.size", 640 * 1024);
+    Path path = new Path(rootPath, "TestSortedCGKeySplit");
+    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+        path);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupInserters {
+  final static String outputFile = "TestColumnGroupInserters";
+  final static private Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static Path path;
+  private static ColumnGroup.Writer writer;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    // set default file system to local file system
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+    // must set a conf here to the underlying FS, or it barks
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    rawLFS.setConf(conf);
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    System.out.println("output file: " + path);
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    finish();
+  }
+
+  @Test
+  public void testInsertNullValues() throws IOException, ParseException {
+    fs.delete(path, true);
+    System.out.println("testInsertNullValues");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins = writer.getInserter("part1", true);
+    // Tuple row = TypesUtils.createTuple(writer.getSchema());
+    // ins.insert(new BytesWritable("key".getBytes()), row);
+    ins.close();
+    close();
+  }
+
+  @Test
+  public void testFailureInvalidSchema() throws IOException, ParseException {
+    fs.delete(path, true);
+    System.out.println("testFailureInvalidSchema");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins = writer.getInserter("part1", true);
+    Tuple row = TypesUtils.createTuple(Schema.parse("xyz, ijk, def"));
+    try {
+      ins.insert(new BytesWritable("key".getBytes()), row);
+      Assert.fail("Failed to catch diff schemas.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      ins.close();
+      close();
+    }
+  }
+
+  @Test
+  public void testFailureGetInserterAfterWriterClosed() throws IOException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testFailureGetInserterAfterWriterClosed");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    try {
+      writer.close();
+      TableInserter ins = writer.getInserter("part1", true);
+      Assert.fail("Failed to catch getInsertion after writer closure.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testFailureInsertAfterClose() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testFailureInsertAfterClose");
+    writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins = writer.getInserter("part1", true);
+
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, new String("val1"));
+
+    SortedMap<String, String> map = new TreeMap<String, String>();
+    map.put("john", "boy");
+    row.set(1, map);
+
+    ins.insert(new BytesWritable("key".getBytes()), row);
+
+    ins.close();
+    writer.close();
+
+    try {
+      TableInserter ins2 = writer.getInserter("part2", true);
+      Assert.fail("Failed to catch insertion after closure.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testFailureInsertXtraColumn() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testFailureInsertXtraColumn");
+    writer = new ColumnGroup.Writer(path, "abc ", false, "pig", "lzo2", true,
+        conf);
+    TableInserter ins = writer.getInserter("part1", true);
+
+    try {
+      Tuple row = TypesUtils.createTuple(writer.getSchema());
+      row.set(0, new String("val1"));
+
+      SortedMap<String, String> map = new TreeMap<String, String>();
+      map.put("john", "boy");
+      row.set(1, map);
+      Assert
+          .fail("Failed to catch insertion an extra column not defined in schema.");
+    } catch (ExecException e) {
+      // noop, expecting exceptions
+    } finally {
+      ins.close();
+      close();
+    }
+  }
+
+  @Test
+  public void testInsertOneRow() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testInsertOneRow");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins = writer.getInserter("part1", true);
+
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, new String("val1"));
+
+    SortedMap<String, String> map = new TreeMap<String, String>();
+    map.put("john", "boy");
+    row.set(1, map);
+
+    ins.insert(new BytesWritable("key".getBytes()), row);
+
+    ins.close();
+    close();
+  }
+
+  @Test
+  public void testInsert2Rows() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testInsert2Rows");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins = writer.getInserter("part1", true);
+
+    // row 1
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, new String("val1"));
+
+    SortedMap<String, String> map = new TreeMap<String, String>();
+    map.put("john", "boy");
+    row.set(1, map);
+
+    ins.insert(new BytesWritable("key".getBytes()), row);
+
+    // row 2
+    TypesUtils.resetTuple(row);
+    row.set(0, new String("val2"));
+    map.put("joe", "boy");
+    map.put("jane", "girl");
+    // map should contain 3 k->v pairs
+    row.set(1, map);
+
+    ins.insert(new BytesWritable("key".getBytes()), row);
+    ins.close();
+
+    ins.close();
+    close();
+  }
+
+  @Test
+  public void testInsert2Inserters() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testInsert2Inserters");
+    writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+        true, conf);
+    TableInserter ins1 = writer.getInserter("part1", true);
+    TableInserter ins2 = writer.getInserter("part2", true);
+
+    // row 1
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, new String("val1"));
+
+    SortedMap<String, String> map = new TreeMap<String, String>();
+    map.put("john", "boy");
+
+    ins1.insert(new BytesWritable("key11".getBytes()), row);
+    ins2.insert(new BytesWritable("key21".getBytes()), row);
+
+    // row 2
+    TypesUtils.resetTuple(row);
+    row.set(0, new String("val2"));
+    map.put("joe", "boy");
+    map.put("jane", "girl");
+    // map should contain 3 k->v pairs
+    row.set(1, map);
+
+    ins2.insert(new BytesWritable("key22".getBytes()), row);
+    // ins2.close();
+    ins1.insert(new BytesWritable("key12".getBytes()), row);
+
+    ins1.close();
+    ins2.close();
+    close();
+  }
+
+  @Test
+  public void testFailureOverlappingKeys() throws IOException, ExecException,
+      ParseException {
+    fs.delete(path, true);
+    System.out.println("testFailureOverlappingKeys");
+    writer = new ColumnGroup.Writer(path, "abc, def ", true, "pig", "lzo2",
+        true, conf);
+    TableInserter ins1 = writer.getInserter("part1", false);
+    TableInserter ins2 = writer.getInserter("part2", false);
+
+    // row 1
+
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, new String("val1"));
+
+    SortedMap<String, String> map = new TreeMap<String, String>();
+    map.put("john", "boy");
+    row.set(1, map);
+
+    ins1.insert(new BytesWritable("key1".getBytes()), row);
+    ins2.insert(new BytesWritable("key2".getBytes()), row);
+
+    // row 2
+    TypesUtils.resetTuple(row);
+    row.set(0, new String("val2"));
+    map.put("joe", "boy");
+    map.put("jane", "girl");
+    // map should contain 3 k->v pairs
+    row.set(1, map);
+
+    ins2.insert(new BytesWritable("key3".getBytes()), row);
+    // ins2.close();
+    ins1.insert(new BytesWritable("key4".getBytes()), row);
+    try {
+      ins1.close();
+      ins2.close();
+      close();
+      Assert.fail("Failed to detect overlapping keys.");
+    } catch (IOException e) {
+      // noop, exceptions expected
+    } finally {
+      ColumnGroup.drop(path, conf);
+    }
+  }
+
+  private static void finish() throws IOException {
+    if (writer != null) {
+      writer.finish();
+    }
+  }
+
+  private static void close() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+      ColumnGroup.drop(path, conf);
+    }
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupOpen {
+  final static String outputFile = "TestColumnGroupOpen";
+  final private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static Path path;
+  private static ColumnGroup.Writer writer;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    // set default file system to local file system
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+    // must set a conf here to the underlying FS, or it barks
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    rawLFS.setConf(conf);
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    System.out.println("output file: " + path);
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    finish();
+  }
+
+  @Test
+  public void testNew() throws IOException, ParseException {
+    System.out.println("testNew");
+    writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+        true, conf);
+    // NOTE: don't call writer.close() here
+    close();
+  }
+
+  @Test
+  public void testFailureExistingSortedDiff() throws IOException,
+      ParseException {
+    System.out.println("testFailureExistingSortedDiff");
+    try {
+      writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+          true, conf);
+      finish();
+      writer = new ColumnGroup.Writer(path, "abc, def", true, "pig", "lzo2",
+          false, conf);
+      Assert.fail("Failed to catch sorted flag alteration.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testExisting() throws IOException, ParseException {
+    System.out.println("testExisting");
+    writer = new ColumnGroup.Writer(path, " abc ,  def ", false, "pig", "lzo2",
+        false, conf);
+    writer.close();
+    close();
+  }
+
+  @Test
+  public void testFailurePathNotDir() throws IOException, ParseException {
+    System.out.println("testFailurePathNotDir");
+    try {
+      // fs.delete(path, true);
+      ColumnGroup.drop(path, conf);
+
+      FSDataOutputStream in = fs.create(path);
+      in.close();
+      writer = new ColumnGroup.Writer(path, "   abc ,  def   ", false, "pig",
+          "lzo2", false, conf);
+      Assert.fail("Failed to catch path not a directory.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testFailureMetaFileExists() throws IOException, ParseException {
+    System.out.println("testFailureMetaFileExists");
+    try {
+      fs.delete(path, true);
+      FSDataOutputStream in = fs.create(new Path(path, ColumnGroup.META_FILE));
+      in.close();
+      writer = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", false,
+          conf);
+      Assert.fail("Failed to catch meta file existence.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testFailureDiffSchema() throws IOException, ParseException {
+    System.out.println("testFailureDiffSchema");
+    try {
+      writer = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", false,
+          conf);
+      writer.finish();
+      writer = new ColumnGroup.Writer(path, "efg", false, "pig", "lzo2", false,
+          conf);
+      Assert.fail("Failed to catch schema differences.");
+    } catch (IOException e) {
+      // noop, expecting exceptions
+    } finally {
+      close();
+    }
+  }
+
+  @Test
+  public void testMultiWriters() throws IOException, ParseException {
+    System.out.println("testMultiWriters");
+    ColumnGroup.Writer writer1 = null;
+    ColumnGroup.Writer writer2 = null;
+    ColumnGroup.Writer writer3 = null;
+    try {
+      writer1 = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", true,
+          conf);
+      writer2 = new ColumnGroup.Writer(path, conf);
+      writer3 = new ColumnGroup.Writer(path, conf);
+
+      TableInserter ins1 = writer1.getInserter("part1", false);
+      TableInserter ins2 = writer2.getInserter("part2", false);
+      TableInserter ins3 = writer3.getInserter("part3", false);
+      ins1.close();
+      ins2.close();
+      ins3.close();
+      // }
+      // catch (IOException e) {
+      // // noop, expecting exceptions
+      // throw e;
+    } finally {
+      if (writer1 != null) {
+        writer1.finish();
+      }
+      if (writer2 != null) {
+        writer2.finish();
+      }
+      if (writer3 != null) {
+        writer3.finish();
+      }
+      close();
+    }
+  }
+
+  private static void finish() throws IOException {
+    if (writer != null) {
+      writer.finish();
+    }
+  }
+
+  private static void close() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+    ColumnGroup.drop(path, conf);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupProjections {
+  final static String outputFile = "TestColumnGroupProjections";
+  final static private Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static Path path;
+  private static Schema schema;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException, ParseException {
+    // set default file system to local file system
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+    // must set a conf here to the underlying FS, or it barks
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    rawLFS.setConf(conf);
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    System.out.println("output file: " + path);
+
+    schema = new Schema("a,b,c,d,e,f,g");
+
+    ColumnGroup.Writer writer = new ColumnGroup.Writer(path, schema, false,
+        "pig", "lzo2", true, conf);
+    TableInserter ins = writer.getInserter("part0", true);
+
+    // row 1
+    Tuple row = TypesUtils.createTuple(writer.getSchema());
+    row.set(0, "a1");
+    row.set(1, "b1");
+    row.set(2, "c1");
+    row.set(3, "d1");
+    row.set(4, "e1");
+    row.set(5, "f1");
+    row.set(6, "g1");
+    ins.insert(new BytesWritable("k1".getBytes()), row);
+
+    // row 2
+    TypesUtils.resetTuple(row);
+    row.set(0, "a2");
+    row.set(1, "b2");
+    row.set(2, "c2");
+    row.set(3, "d2");
+    row.set(4, "e2");
+    row.set(5, "f2");
+    row.set(6, "g2");
+    ins.insert(new BytesWritable("k2".getBytes()), row);
+    ins.close();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  @Test
+  // default projection (without any projection) should return every column
+  public void testDefaultProjection() throws IOException, ExecException,
+      ParseException {
+    System.out.println("testDefaultProjection");
+    // test without beginKey/endKey
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    TableScanner scanner = reader.getScanner(null, false);
+
+    defTest(reader, scanner);
+
+    scanner.close();
+    reader.close();
+  }
+
+  private void defTest(ColumnGroup.Reader reader, TableScanner scanner)
+      throws IOException, ParseException {
+    BytesWritable key = new BytesWritable();
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    Assert.assertEquals("a1", row.get(0));
+    Assert.assertEquals("b1", row.get(1));
+    Assert.assertEquals("g1", row.get(6));
+
+    // move to next row
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    Assert.assertEquals("a2", row.get(0));
+    Assert.assertEquals("b2", row.get(1));
+    Assert.assertEquals("g2", row.get(6));
+  }
+
+  @Test
+  // null projection should be same as default (fully projected) projection
+  public void testNullProjection() throws IOException, ExecException,
+      ParseException {
+    System.out.println("testNullProjection");
+    // test without beginKey/endKey
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(null);
+    TableScanner scanner = reader.getScanner(null, false);
+
+    defTest(reader, scanner);
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  // empty projection should project no columns at all
+  public void testEmptyProjection() throws Exception {
+    System.out.println("testEmptyProjection");
+    // test without beginKey/endKey
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("");
+
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+    TableScanner scanner = reader.getScanner(null, false);
+    BytesWritable key = new BytesWritable();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    try {
+      row.get(0);
+      Assert.fail("Failed to catch out of boundary exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    // Assert.assertEquals("c2", row.get(0));
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  // a column name that does not exist in the column group
+  public void testOneNonExistentProjection() throws Exception {
+    System.out.println("testOneNonExistentProjection");
+    // test without beginKey/endKey
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("X");
+
+    Tuple row = TypesUtils.createTuple(1);
+    TableScanner scanner = reader.getScanner(null, false);
+    BytesWritable key = new BytesWritable();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    Assert.assertNull(row.get(0));
+    try {
+      row.get(1);
+      Assert.fail("Failed to catch out of boundary exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    // Assert.assertEquals("c2", row.get(0));
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  // normal one column projection
+  public void testOneColumnProjection() throws Exception {
+    System.out.println("testOneColumnProjection");
+    // test without beginKey/endKey
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("c");
+
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+    TableScanner scanner = reader.getScanner(null, false);
+    BytesWritable key = new BytesWritable();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    Assert.assertEquals("c1", row.get(0));
+    try {
+      row.get(1);
+      Assert.fail("Failed to catch 'out of boundary' exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    Assert.assertEquals("c2", row.get(0));
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  // normal one column plus a non-existent column projection
+  public void testOnePlusNonProjection() throws Exception {
+    System.out.println("testOnePlusNonProjection");
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(",f");
+
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+    TableScanner scanner = reader.getScanner(null, false);
+    BytesWritable key = new BytesWritable();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    Assert.assertNull(row.get(0));
+    Assert.assertEquals("f1", row.get(1));
+    try {
+      row.get(2);
+      Assert.fail("Failed to catch 'out of boundary' exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    Assert.assertEquals("f2", row.get(1));
+
+    scanner.close();
+    reader.close();
+  }
+
+  @Test
+  // two normal columns projected
+  public void testTwoColumnsProjection() throws Exception {
+    System.out.println("testTwoColumnsProjection");
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("f,a");
+
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+    TableScanner scanner = reader.getScanner(null, false);
+    BytesWritable key = new BytesWritable();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+    scanner.getValue(row);
+    Assert.assertEquals("f1", row.get(0));
+    Assert.assertEquals("a1", row.get(1));
+    try {
+      row.get(2);
+      Assert.fail("Failed to catch 'out of boundary' exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+    TypesUtils.resetTuple(row);
+    scanner.getValue(row);
+    Assert.assertEquals("f2", row.get(0));
+    Assert.assertEquals("a2", row.get(1));
+
+    scanner.close();
+    reader.close();
+  }
+}