You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC

svn commit: r803312 [12/16] - in /hadoop/pig/trunk: ./ contrib/zebra/ contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/ contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/ contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/...

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestSchema {
+  // final static String STR_SCHEMA =
+  // "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long),  m1:map(string), c:collection(f13:double, f14:float, f15:bytes)";
+  // final static String STR_STORAGE =
+  // "[s1, s2]; [r1.f1]; [s3, s4]; [s5, s6]; [r1.f2]; [c.f13]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestSchema");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void testSimple() throws IOException, ParseException {
+    String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+    String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
+
+    // Build Table and column groups
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    // insert data in row 1
+    int row = 0;
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // insert data in row 2
+    row++;
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // finish building table, closing out the inserter, writer, writer1
+    inserter.close();
+    writer1.finish();
+    writer.close();
+
+    // Starting read
+    String projection = new String("s6,s5,s4,s3,s2,s1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(true, RowValue.get(5));
+    Assert.assertEquals(1, RowValue.get(4));
+    Assert.assertEquals(1.1, RowValue.get(2));
+    Assert.assertEquals("hello world 1", RowValue.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(false, RowValue.get(5));
+    Assert.assertEquals(2, RowValue.get(4));
+    Assert.assertEquals(3.1, RowValue.get(2));
+    Assert.assertEquals("hello world 2", RowValue.get(1));
+
+    // test stitch, I can re_use reader, split, but NOT scanner
+    String projection2 = new String("s5, s1");
+    reader.setProjection(projection2);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("hello world 1", RowValue.get(0));
+    Assert.assertEquals(true, RowValue.get(1));
+
+    scanner.advance();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("hello world 2", RowValue.get(0));
+    Assert.assertEquals(false, RowValue.get(1));
+
+    reader.close();
+  }
+
+  @Test
+  public void testRedord() throws IOException, ParseException {
+    BasicTable.drop(path, conf);
+    String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))";
+    String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+
+    path = new Path(getCurrentMethodName());
+    // in case BasicTable exists
+    BasicTable.drop(path, conf);
+    System.out.println("in testRecord, get path: " + path.toString());
+    // Build Table and column groups
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord2;
+    try {
+      tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord3;
+    try {
+      tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    // insert data in row 1
+    int row = 0;
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 1);
+    tupRecord1.set(1, 1001L);
+    tuple.set(0, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 1.3);
+    tupRecord3.set(1, new DataByteArray("r3 row1 byte array"));
+    tuple.set(1, tupRecord2);
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(tupRecord1);
+    TypesUtils.resetTuple(tupRecord2);
+    TypesUtils.resetTuple(tupRecord3);
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 2);
+    tupRecord1.set(1, 1002L);
+    tuple.set(0, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 2.3);
+    tupRecord3.set(1, new DataByteArray("r3 row2 byte array"));
+    tuple.set(1, tupRecord2);
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // finish building table, closing out the inserter, writer, writer1
+    inserter.close();
+    writer1.finish();
+    writer.close();
+
+    // Starting read , simple projection 0, not record of record level. PASS
+    String projection0 = new String("r1.f1, r1.f2");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection0);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(1, RowValue.get(0));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(2, RowValue.get(0));
+
+    /*
+     * String STR_SCHEMA =
+     * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+     * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+     */
+    // Starting read , simple projection 1 , record of record level, FIXED,
+    // PASS on advance
+    String projection1 = new String("r2.r3.f4, r2.r3.f3");
+    reader.setProjection(projection1);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("r3 row1 byte array", RowValue.get(0).toString());
+    Assert.assertEquals(1.3, RowValue.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("r3 row2 byte array", RowValue.get(0).toString());
+    Assert.assertEquals(2.3, RowValue.get(1));
+
+    /*
+     * String STR_SCHEMA =
+     * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+     * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+     */
+    // test stitch, [r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]
+    String projection2 = new String("r1.f1, r2.r3.f3");
+    reader.setProjection(projection2);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(1, RowValue.get(0));
+    Assert.assertEquals(1.3, RowValue.get(1));
+
+    scanner.advance();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(2, RowValue.get(0));
+    Assert.assertEquals(2.3, RowValue.get(1));
+
+    /*
+     * String STR_SCHEMA =
+     * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+     * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+     */
+    // test stitch, [r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]
+    String projection3 = new String("r1, r2");
+    reader.setProjection(projection3);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+
+    Assert.assertEquals(1, ((Tuple) RowValue.get(0)).get(0));
+    Assert.assertEquals(1001L, ((Tuple) RowValue.get(0)).get(1));
+    Assert.assertEquals("((1.3,r3 row1 byte array))", RowValue.get(1)
+        .toString());
+    Assert.assertEquals(1.3, ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(0));
+    Assert.assertEquals("r3 row1 byte array",
+        ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
+
+    scanner.advance();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(2, ((Tuple) RowValue.get(0)).get(0));
+    Assert.assertEquals(1002L, ((Tuple) RowValue.get(0)).get(1));
+    Assert.assertEquals("((2.3,r3 row2 byte array))", RowValue.get(1)
+        .toString());
+    Assert.assertEquals(2.3, ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(0));
+    Assert.assertEquals("r3 row2 byte array",
+        ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
+
+    // test projection, negative, none-exist column name. FAILED, It crashes
+    // here
+    String projection4 = new String("r3");
+    reader.setProjection(projection4);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    // scanner.getValue(RowValue); //FAILED, It crashes here
+
+    reader.close();
+
+  }
+
+  @Test
+  public void testMap() throws IOException, ParseException {
+    String STR_SCHEMA = "m1:map(string),m2:map(map(int))";
+    String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}]";
+
+    // String STR_SCHEMA = "m1:map(string)";
+    // String STR_STORAGE = "[m1#{a}]";
+    // Build Table and column groups
+    path = new Path(getCurrentMethodName());
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    // m1:map(string)
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put("a", "A");
+    m1.put("b", "B");
+    m1.put("c", "C");
+    tuple.set(0, m1);
+
+    // m2:map(map(int))
+    HashMap<String, Map> m2 = new HashMap<String, Map>();
+    Map<String, Integer> m3 = new HashMap<String, Integer>();
+    m3.put("m311", 311);
+    m3.put("m321", 321);
+    m3.put("m331", 331);
+    Map<String, Integer> m4 = new HashMap<String, Integer>();
+    m4.put("m411", 411);
+    m4.put("m421", 421);
+    m4.put("m431", 431);
+    m2.put("x", m3);
+    m2.put("y", m4);
+    tuple.set(1, m2);
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    m1.clear();
+    m2.clear();
+    m3.clear();
+    m4.clear();
+    // m1:map(string)
+    m1.put("a", "A2");
+    m1.put("b2", "B2");
+    m1.put("c2", "C2");
+    tuple.set(0, m1);
+
+    // m2:map(map(int))
+    m3.put("m321", 321);
+    m3.put("m322", 322);
+    m3.put("m323", 323);
+    m2.put("z", m3);
+    tuple.set(1, m2);
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // finish building table, closing out the inserter, writer, writer1
+    inserter.close();
+    writer1.finish();
+    writer.close();
+
+    // Starting read Simple read one map PASS
+    String projection = new String("m1#{a}");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("{a=A}", RowValue.get(0).toString());
+
+    // If row 1 has a==>A, and row 2 doesn't have key a, scanner advance
+    // will still return a==>A
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    System.out.println(RowValue.get(0).toString());
+    Assert.assertEquals("{a=A2}", RowValue.get(0).toString());
+
+    // m1:map(string),m2:map(map(int))";
+    // String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}]";
+
+    // test stitch, I can re_use reader, split, but NOT scanner
+    String projection2 = new String("m1#{b}, m2#{x|z}");
+    reader.setProjection(projection2);
+    scanner = reader.getScanner(splits.get(0), true);
+    RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    // TODO Assert.assertEquals("{a=A}", RowValue.get(0).toString());
+    Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b"));
+    Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("x"))
+        .get("m321"));
+    Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("z")));
+    // m2's x key:
+    /*
+     * m3.put("m311", 311); m3.put("m321", 321); m3.put("m331", 331);
+     */
+    System.out.println(RowValue.get(1).toString());
+
+    TypesUtils.resetTuple(RowValue);
+    scanner.advance();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b"));
+    Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x")));
+    Assert.assertEquals(323, ((Map) ((Map) RowValue.get(1)).get("z"))
+        .get("m323"));
+
+    reader.close();
+
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestSimple {
+
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+  final static String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestSimple");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    // insert data in row 1
+    int row = 0;
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // insert data in row 2
+    row++;
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // finish building table, closing out the inserter, writer, writer1
+    inserter.close();
+    writer1.finish();
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  // Test simple projection
+  @Test
+  public void testReadSimple1() throws IOException, ParseException {
+    String projection = new String("s6,s5,s4,s3,s2,s1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(true, RowValue.get(5));
+    Assert.assertEquals(1, RowValue.get(4));
+    Assert.assertEquals(1001L, RowValue.get(3));
+    Assert.assertEquals(1.1, RowValue.get(2));
+    Assert.assertEquals("hello world 1", RowValue.get(1));
+    Assert.assertEquals("hello byte 1", RowValue.get(0).toString());
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals(false, RowValue.get(5));
+    Assert.assertEquals(2, RowValue.get(4));
+    Assert.assertEquals(1002L, RowValue.get(3));
+    Assert.assertEquals(3.1, RowValue.get(2));
+    Assert.assertEquals("hello world 2", RowValue.get(1));
+    Assert.assertEquals("hello byte 2", RowValue.get(0).toString());
+  }
+
+  // test stitch,
+  @Test
+  public void testReadSimpleStitch() throws IOException, ParseException {
+    String projection2 = new String("s5, s1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection2);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("hello world 1", RowValue.get(0));
+    Assert.assertEquals(true, RowValue.get(1));
+
+    scanner.advance();
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(RowValue);
+    Assert.assertEquals("hello world 2", RowValue.get(0));
+    Assert.assertEquals(false, RowValue.get(1));
+
+    reader.close();
+  }
+
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestWrite {
+  final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)), m1:map(string),m2:map(map(int)), c:collection(f13:double, f14:float, f15:bytes)";
+  final static String STR_STORAGE = "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}]; [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestWrite");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord2;
+    try {
+      tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    Tuple tupRecord3;
+    try {
+      tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    // row 1
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 1);
+    tupRecord1.set(1, 1001L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 1.3);
+    tupRecord3.set(1, new DataByteArray("r3 row 1 byte array "));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put("a", "A");
+    m1.put("b", "B");
+    m1.put("c", "C");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    HashMap<String, Map> m2 = new HashMap<String, Map>();
+    Map<String, Integer> m3 = new HashMap<String, Integer>();
+    m3.put("m311", 311);
+    m3.put("m321", 321);
+    m3.put("m331", 331);
+    Map<String, Integer> m4 = new HashMap<String, Integer>();
+    m4.put("m411", 411);
+    m4.put("m421", 421);
+    m4.put("m431", 431);
+    m2.put("x", m3);
+    m2.put("y", m4);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(10).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(tupRecord1);
+    TypesUtils.resetTuple(tupRecord2);
+    TypesUtils.resetTuple(tupRecord3);
+    m1.clear();
+    m2.clear();
+    m3.clear();
+    m4.clear();
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 2);
+    tupRecord1.set(1, 1002L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 2.3);
+    tupRecord3.set(1, new DataByteArray("r3 row2  byte array"));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    m1.put("a2", "A2");
+    m1.put("b2", "B2");
+    m1.put("c2", "C2");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    m3.put("m321", 321);
+    m3.put("m322", 322);
+    m3.put("m323", 323);
+    m2.put("z", m3);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    bagColl.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("r1.f2, s1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Assert.assertEquals(1001L, value.get(0));
+    Assert.assertEquals(true, value.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(value);
+    Assert.assertEquals(1002L, value.get(0));
+    Assert.assertEquals(false, value.get(1));
+
+    reader.close();
+  }
+
+  @Test
+  public void testStitch() throws IOException, ParseException {
+    String projection = new String("s1, r1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Tuple recordTuple = (Tuple) value.get(1);
+    Assert.assertEquals(1, recordTuple.get(0));
+    Assert.assertEquals(1001L, recordTuple.get(1));
+    Assert.assertEquals(true, value.get(0));
+    reader.close();
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Flat;
+
+/**
+ * Generate some input text files.
+ */
+class ArticleGenerator {
+  Random random;
+  Dictionary dict;
+  int pageWidth;
+  DiscreteRNG lastLineLenGen;
+  DiscreteRNG paragraphLineLenGen;
+  DiscreteRNG paragraphLenGen;
+  long wordCount;
+  long lineCount;
+
+  /**
+   * Create an article generator.
+   * 
+   * @param dictWordCnt
+   *          Number of words in the dictionary.
+   * @param minWordLen
+   *          Minimum word length
+   * @param maxWordLen
+   *          Maximum word length
+   * @param lineWidth
+   *          Line width.
+   */
+  ArticleGenerator(int dictWordCnt, int minWordLen, int maxWordLen,
+      int pageWidth) {
+    random = new Random(System.nanoTime());
+    dict = new Dictionary(random, dictWordCnt, minWordLen, maxWordLen, 100);
+    this.pageWidth = pageWidth;
+    lastLineLenGen = new Flat(random, 1, pageWidth);
+    paragraphLineLenGen = new Flat(random, pageWidth * 3 / 4, pageWidth);
+    paragraphLenGen = new Flat(random, 1, 40);
+  }
+
+  /**
+   * Create an article
+   * 
+   * @param fs
+   *          File system.
+   * @param path
+   *          path of the file
+   * @param length
+   *          Expected size of the file.
+   * @throws IOException
+   */
+  void createArticle(FileSystem fs, Path path, long length) throws IOException {
+    FSDataOutputStream fsdos = fs.create(path, false);
+    StringBuilder sb = new StringBuilder();
+    int remainLinesInParagraph = paragraphLenGen.nextInt();
+    while (fsdos.getPos() < length) {
+      if (remainLinesInParagraph == 0) {
+        remainLinesInParagraph = paragraphLenGen.nextInt();
+        fsdos.write('\n');
+      }
+      int lineLen = paragraphLineLenGen.nextInt();
+      if (--remainLinesInParagraph == 0) {
+        lineLen = lastLineLenGen.nextInt();
+      }
+      sb.setLength(0);
+      while (sb.length() < lineLen) {
+        if (sb.length() > 0) {
+          sb.append(' ');
+        }
+        sb.append(dict.nextWord());
+        ++wordCount;
+      }
+      sb.append('\n');
+      fsdos.write(sb.toString().getBytes());
+      ++lineCount;
+    }
+    fsdos.close();
+  }
+
+  /**
+   * Create a bunch of files under the same directory.
+   * 
+   * @param fs
+   *          File system
+   * @param parent
+   *          directory where files should be created
+   * @param prefix
+   *          prefix name of the files
+   * @param n
+   *          total number of files
+   * @param length
+   *          length of each file.
+   * @throws IOException
+   */
+  void batchArticalCreation(FileSystem fs, Path parent, String prefix, int n,
+      long length) throws IOException {
+    for (int i = 0; i < n; ++i) {
+      createArticle(fs, new Path(parent, String.format("%s%06d", prefix, i)),
+          length);
+    }
+  }
+
+  static class Summary {
+    long wordCount;
+    long lineCount;
+    Map<String, Long> wordCntDist;
+
+    Summary() {
+      wordCntDist = new HashMap<String, Long>();
+    }
+  }
+
+  void resetSummary() {
+    wordCount = 0;
+    lineCount = 0;
+    dict.resetWordCnts();
+  }
+
+  Summary getSummary() {
+    Summary ret = new Summary();
+    ret.wordCount = wordCount;
+    ret.lineCount = lineCount;
+    ret.wordCntDist = dict.getWordCounts();
+    return ret;
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Binomial;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Zipf;
+
+/**
+ * A dictionary that generates English words, whose frequency follows Zipf
+ * distributions, and length follows Binomial distribution.
+ */
+class Dictionary {
+  private static final double BINOMIAL_P = 0.3;
+  private static final double SIGMA = 1.1;
+  private final int lead;
+  private final Zipf zipf;
+  private final String[] dict;
+  private final long[] wordCnts;
+
+  private static String makeWord(DiscreteRNG rng, Random random) {
+    int len = rng.nextInt();
+    StringBuilder sb = new StringBuilder(len);
+    for (int i = 0; i < len; ++i) {
+      sb.append((char) ('a' + random.nextInt(26)));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param entries
+   *          How many words exist in the dictionary.
+   * @param minWordLen
+   *          Minimum word length.
+   * @param maxWordLen
+   *          Maximum word length.
+   * @param freqRatio
+   *          Expected ratio between the most frequent words and the least
+   *          frequent words. (e.g. 100)
+   */
+  public Dictionary(Random random, int entries, int minWordLen, int maxWordLen,
+      int freqRatio) {
+    Binomial binomial = new Binomial(random, minWordLen, maxWordLen, BINOMIAL_P);
+    lead = Math.max(0,
+        (int) (entries / (Math.exp(Math.log(freqRatio) / SIGMA) - 1)) - 1);
+    zipf = new Zipf(random, lead, entries + lead, 1.1);
+    dict = new String[entries];
+    // Use a set to ensure no dup words in dictionary
+    Set<String> dictTmp = new HashSet<String>();
+    for (int i = 0; i < entries; ++i) {
+      while (true) {
+        String word = makeWord(binomial, random);
+        if (!dictTmp.contains(word)) {
+          dictTmp.add(word);
+          dict[i] = word;
+          break;
+        }
+      }
+    }
+    wordCnts = new long[dict.length];
+  }
+
+  /**
+   * Get the next word from the dictionary.
+   * 
+   * @return The next word from the dictionary.
+   */
+  public String nextWord() {
+    int index = zipf.nextInt() - lead;
+    ++wordCnts[index];
+    return dict[index];
+  }
+
+  public void resetWordCnts() {
+    for (int i = 0; i < wordCnts.length; ++i) {
+      wordCnts[i] = 0;
+    }
+  }
+
+  public Map<String, Long> getWordCounts() {
+    Map<String, Long> ret = new HashMap<String, Long>();
+    for (int i = 0; i < dict.length; ++i) {
+      if (wordCnts[i] > 0) {
+        ret.put(dict[i], wordCnts[i]);
+      }
+    }
+    return ret;
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * this 2
+ * is 1
+ * a 4 
+ * test 2 
+ * hello 1 
+ * world 3
+ * </pre>
+ * 
+ */
+public class TableMRSample {
+  static class MapClass implements
+      Mapper<LongWritable, Text, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+        throws IOException {
+
+      // value should contain "word count"
+      String[] wdct = value.toString().split(" ");
+      if (wdct.length != 2) {
+        // LOG the error
+        return;
+      }
+
+      byte[] word = wdct[0].getBytes();
+      bytesKey.set(word, 0, word.length);
+      tupleRow.set(0, new String(word));
+      tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+      output.collect(bytesKey, tupleRow);
+    }
+
+    @Override
+    public void configure(JobConf job) {
+      bytesKey = new BytesWritable();
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(job);
+        tupleRow = TypesUtils.createTuple(outSchema);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+  }
+
+  public static void main(String[] args) throws ParseException, IOException {
+    JobConf jobConf = new JobConf();
+    jobConf.setJobName("tableMRSample");
+    jobConf.set("table.output.tfile.compression", "gz");
+
+    // input settings
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setMapperClass(TableMRSample.MapClass.class);
+    FileInputFormat.setInputPaths(jobConf, new Path(
+        "/user/joe/inputdata/input.txt"));
+    jobConf.setNumMapTasks(2);
+
+    // output settings
+    Path outPath = new Path("/user/joe/outputdata/");
+    jobConf.setOutputFormat(BasicTableOutputFormat.class);
+    BasicTableOutputFormat.setOutputPath(jobConf, outPath);
+    // set the logical schema with 2 columns
+    BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+    // for demo purposes, create 2 physical column groups
+    BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+    // set map-only job.
+    jobConf.setNumReduceTasks(0);
+    JobClient.runJob(jobConf);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * this 2
+ * is 1
+ * a 4 
+ * test 2 
+ * hello 1 
+ * world 3
+ * </pre>
+ * 
+ */
+public class TableMRSample1 {
+  static class MapClass implements
+      Mapper<LongWritable, Text, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+        throws IOException {
+
+      // value should contain "word count"
+      String[] wdct = value.toString().split(" ");
+      if (wdct.length != 2) {
+        // LOG the error
+        throw new IOException("Does not contain two fields");
+      }
+
+      byte[] word = wdct[0].getBytes();
+      bytesKey.set(word, 0, word.length);
+      tupleRow.set(0, new String(word));
+      tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+      output.collect(bytesKey, tupleRow);
+    }
+
+    @Override
+    public void configure(JobConf job) {
+      bytesKey = new BytesWritable();
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(job);
+        tupleRow = TypesUtils.createTuple(outSchema);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+
+  }
+
+  public static void main(String[] args) throws ParseException, IOException {
+    JobConf jobConf = new JobConf();
+    jobConf.setJobName("tableMRSample");
+    jobConf.set("table.output.tfile.compression", "gz");
+
+    // input settings
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setMapperClass(TableMRSample1.MapClass.class);
+    FileInputFormat.setInputPaths(jobConf, new Path("/user/joe/input.txt"));
+    jobConf.setNumMapTasks(2);
+
+    // output settings
+    Path outPath = new Path("/user/joe/tableout");
+    jobConf.setOutputFormat(BasicTableOutputFormat.class);
+    BasicTableOutputFormat.setOutputPath(jobConf, outPath);
+    // set the logical schema with 2 columns
+    BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+    // for demo purposes, create 2 physical column groups
+    BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+    // set map-only job.
+    jobConf.setNumReduceTasks(0);
+    JobClient.runJob(jobConf);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample to show using zebra table to do a simple basic union in
+ * map/reduce * To run this, we need have two basic tables ready. They contain
+ * the data as in Sample 1, i.e., (word, count). In this example, they are at:
+ * /homes/chaow/mapredu/t1 /homes/chaow/mapredu/t2 The resulting table is put
+ * at: /homes/chaow/mapredu2/t1
+ * 
+ */
+public class TableMRSample2 {
+  static class MapClass implements
+      Mapper<BytesWritable, Tuple, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+
+    @Override
+    public void map(BytesWritable key, Tuple value,
+        OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+        throws IOException
+
+    {
+      System.out.println(key.toString() + value.toString());
+      output.collect(key, value);
+    }
+
+    @Override
+    public void configure(JobConf job) {
+      bytesKey = new BytesWritable();
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(job);
+        tupleRow = TypesUtils.createTuple(outSchema);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+
+    public static void main(String[] args) throws ParseException, IOException {
+      JobConf jobConf = new JobConf();
+      jobConf.setJobName("tableMRSample");
+      jobConf.set("table.output.tfile.compression", "gz");
+
+      // input settings
+      jobConf.setInputFormat(TableInputFormat.class);
+      jobConf.setOutputFormat(BasicTableOutputFormat.class);
+      jobConf.setMapperClass(TableMRSample2.MapClass.class);
+
+      List<Path> paths = new ArrayList<Path>(2);
+      Path p = new Path("/homes/chaow/mapredu/t1");
+      System.out.println("path = " + p);
+      paths.add(p);
+      p = new Path("/homes/chaow/mapredu/t2");
+      paths.add(p);
+
+      TableInputFormat.setInputPaths(jobConf, paths.toArray(new Path[2]));
+      TableInputFormat.setProjection(jobConf, "word");
+      BasicTableOutputFormat.setOutputPath(jobConf, new Path(
+          "/homes/chaow/mapredu2/t1"));
+
+      BasicTableOutputFormat.setSchema(jobConf, "word:string");
+      BasicTableOutputFormat.setStorageHint(jobConf, "[word]");
+
+      // set map-only job.
+      jobConf.setNumReduceTasks(0);
+      jobConf.setNumMapTasks(2);
+      JobClient.runJob(jobConf);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>TableMapReduceExample<code> is a map-reduce example for Table Input/Output Format.
+ * <p/>
+ * Schema for Table is set to two columns containing Word of type <i>string</i> and Count of type <i>int</i> using <code> BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int"); </code>
+ * <p/>
+ * Hint for creation of Column Groups is specified using
+ * <code>  BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]"); </code>
+ * . Here we have two column groups.
+ * <p/>
+ * Input file should contain rows of word and count, separated by a space. For
+ * example:
+ * 
+ * <pre>
+ * this 2
+ * is 1
+ * a 5
+ * test 2
+ * hello 1
+ * world 3
+ * </pre>
+ * <p/>
+ * <p>
+ * Second job reads output from the first job which is in Table Format. Here we
+ * specify <i>count</i> as projection column. Table Input Format projects in put
+ * row which has both word and count into a row containing only the count column
+ * and hands it to map.
+ * <p/>
+ * Reducer sums the counts and produces a sum of counts which should match total
+ * number of words in original text.
+ */
+
+public class TableMapReduceExample extends Configured implements Tool {
+
+  static class Map extends MapReduceBase implements
+      Mapper<LongWritable, Text, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+
+    /**
+     * Map method for reading input.
+     */
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+        throws IOException {
+
+      // value should contain "word count"
+      String[] wordCount = value.toString().split(" ");
+      if (wordCount.length != 2) {
+        // LOG the error
+        throw new IOException("Value does not contain two fields:" + value);
+      }
+
+      byte[] word = wordCount[0].getBytes();
+      bytesKey.set(word, 0, word.length);
+      tupleRow.set(0, new String(word));
+      tupleRow.set(1, Integer.parseInt(wordCount[1]));
+
+      output.collect(bytesKey, tupleRow);
+    }
+
+    /**
+     * Configuration of the job. Here we create an empty Tuple Row.
+     */
+    @Override
+    public void configure(JobConf job) {
+      bytesKey = new BytesWritable();
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(job);
+        tupleRow = TypesUtils.createTuple(outSchema);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  static class ProjectionMap extends MapReduceBase implements
+      Mapper<BytesWritable, Tuple, Text, IntWritable> {
+    private final static Text all = new Text("All");
+
+    /**
+     * Map method which gets count column after projection.
+     * 
+     * @throws IOException
+     */
+    @Override
+    public void map(BytesWritable key, Tuple value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      output.collect(all, new IntWritable((Integer) value.get(0)));
+    }
+  }
+
+  public static class ProjectionReduce extends MapReduceBase implements
+      Reducer<Text, IntWritable, Text, IntWritable> {
+    /**
+     * Reduce method which implements summation. Acts as both reducer and
+     * combiner.
+     * 
+     * @throws IOException
+     */
+    public void reduce(Text key, Iterator<IntWritable> values,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+
+  /**
+   * Where jobs and their settings and sequence is set.
+   * 
+   * @param args
+   *          arguments with exception of Tools understandable ones.
+   */
+  public int run(String[] args) throws Exception {
+    if (args == null || args.length != 3) {
+      System.out
+          .println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
+      System.exit(-1);
+    }
+
+    /*
+     * First MR Job creating a Table with two columns
+     */
+    JobConf jobConf = new JobConf();
+    jobConf.setJobName("TableMapReduceExample");
+    jobConf.set("table.output.tfile.compression", "none");
+
+    // Input settings
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setMapperClass(Map.class);
+    FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
+
+    // Output settings
+    jobConf.setOutputFormat(BasicTableOutputFormat.class);
+    BasicTableOutputFormat.setOutputPath(jobConf, new Path(args[1]));
+
+    // set the logical schema with 2 columns
+    BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+
+    // for demo purposes, create 2 physical column groups
+    BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+    // set map-only job.
+    jobConf.setNumReduceTasks(0);
+
+    // Run Job
+    JobClient.runJob(jobConf);
+
+    /*
+     * Second MR Job for Table Projection of count column
+     */
+    JobConf projectionJobConf = new JobConf();
+    projectionJobConf.setJobName("TableProjectionMapReduceExample");
+
+    // Input settings
+    projectionJobConf.setMapperClass(ProjectionMap.class);
+    projectionJobConf.setInputFormat(TableInputFormat.class);
+    TableInputFormat.setProjection(projectionJobConf, "count");
+    TableInputFormat.setInputPaths(projectionJobConf, new Path(args[1]));
+    projectionJobConf.setMapOutputKeyClass(Text.class);
+    projectionJobConf.setMapOutputValueClass(IntWritable.class);
+
+    // Output settings
+    projectionJobConf.setOutputFormat(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(projectionJobConf, new Path(args[2]));
+    projectionJobConf.setReducerClass(ProjectionReduce.class);
+    projectionJobConf.setCombinerClass(ProjectionReduce.class);
+
+    // Run Job
+    JobClient.runJob(projectionJobConf);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(),
+        args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Testing BasicTableOutputFormat and TableInputFormat using LocalFS with larger
+ * settings.
+ */
+public class TestBasicTableIOFMoreLocal extends TestBasicTableIOFormatLocalFS {
+  @Override
+  protected void setUp() throws IOException {
+    LOG = LogFactory.getLog(TestBasicTableIOFMoreLocal.class.getName());
+
+    if (options == null) {
+      options = new Options();
+      options.srcFiles = 20;
+      options.numBatches = 1;
+      options.numMapper = 5;
+      options.numReducer = 4;
+    }
+
+    super.setUp();
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Testing BasicTableOutputFormat and TableInputFormat using DFS
+ */
+public class TestBasicTableIOFormatDFS extends TestBasicTableIOFormatLocalFS {
+  @Override
+  protected void setUp() throws IOException {
+    LOG = LogFactory.getLog(TestBasicTableIOFormatDFS.class.getName());
+
+    if (options == null) {
+      options = new Options();
+      options.localFS = false;
+    }
+
+    super.setUp();
+  }
+}