You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC
svn commit: r803312 [12/16] - in /hadoop/pig/trunk: ./ contrib/zebra/
contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/
contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/
contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/...
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * Test projections on complicated column types.
+ *
+ */
+public class TestSchema {
+ // final static String STR_SCHEMA =
+ // "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), m1:map(string), c:collection(f13:double, f14:float, f15:bytes)";
+ // final static String STR_STORAGE =
+ // "[s1, s2]; [r1.f1]; [s3, s4]; [s5, s6]; [r1.f2]; [c.f13]";
+ private static Configuration conf;
+ private static Path path;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), "TestSchema");
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ BasicTable.drop(path, conf);
+ }
+
+ @Test
+ public void testSimple() throws IOException, ParseException {
+ String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+ String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
+
+ // Build Table and column groups
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ writer.finish();
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ // insert data in row 1
+ int row = 0;
+ tuple.set(0, true); // bool
+ tuple.set(1, 1); // int
+ tuple.set(2, 1001L); // long
+ tuple.set(3, 1.1); // float
+ tuple.set(4, "hello world 1"); // string
+ tuple.set(5, new DataByteArray("hello byte 1")); // byte
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // insert data in row 2
+ row++;
+ tuple.set(0, false);
+ tuple.set(1, 2); // int
+ tuple.set(2, 1002L); // long
+ tuple.set(3, 3.1); // float
+ tuple.set(4, "hello world 2"); // string
+ tuple.set(5, new DataByteArray("hello byte 2")); // byte
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // finish building table, closing out the inserter, writer, writer1
+ inserter.close();
+ writer1.finish();
+ writer.close();
+
+ // Starting read
+ String projection = new String("s6,s5,s4,s3,s2,s1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(true, RowValue.get(5));
+ Assert.assertEquals(1, RowValue.get(4));
+ Assert.assertEquals(1.1, RowValue.get(2));
+ Assert.assertEquals("hello world 1", RowValue.get(1));
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(false, RowValue.get(5));
+ Assert.assertEquals(2, RowValue.get(4));
+ Assert.assertEquals(3.1, RowValue.get(2));
+ Assert.assertEquals("hello world 2", RowValue.get(1));
+
+ // test stitch, I can re_use reader, split, but NOT scanner
+ String projection2 = new String("s5, s1");
+ reader.setProjection(projection2);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("hello world 1", RowValue.get(0));
+ Assert.assertEquals(true, RowValue.get(1));
+
+ scanner.advance();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("hello world 2", RowValue.get(0));
+ Assert.assertEquals(false, RowValue.get(1));
+
+ reader.close();
+ }
+
+ @Test
+ public void testRedord() throws IOException, ParseException {
+ BasicTable.drop(path, conf);
+ String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))";
+ String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+
+ path = new Path(getCurrentMethodName());
+ // in case BasicTable exists
+ BasicTable.drop(path, conf);
+ System.out.println("in testRecord, get path: " + path.toString());
+ // Build Table and column groups
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ writer.finish();
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ Tuple tupRecord1;
+ try {
+ tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupRecord2;
+ try {
+ tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupRecord3;
+ try {
+ tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ // insert data in row 1
+ int row = 0;
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 1);
+ tupRecord1.set(1, 1001L);
+ tuple.set(0, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 1.3);
+ tupRecord3.set(1, new DataByteArray("r3 row1 byte array"));
+ tuple.set(1, tupRecord2);
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // row 2
+ row++;
+ TypesUtils.resetTuple(tuple);
+ TypesUtils.resetTuple(tupRecord1);
+ TypesUtils.resetTuple(tupRecord2);
+ TypesUtils.resetTuple(tupRecord3);
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 2);
+ tupRecord1.set(1, 1002L);
+ tuple.set(0, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 2.3);
+ tupRecord3.set(1, new DataByteArray("r3 row2 byte array"));
+ tuple.set(1, tupRecord2);
+
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // finish building table, closing out the inserter, writer, writer1
+ inserter.close();
+ writer1.finish();
+ writer.close();
+
+ // Starting read , simple projection 0, not record of record level. PASS
+ String projection0 = new String("r1.f1, r1.f2");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection0);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(1, RowValue.get(0));
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(2, RowValue.get(0));
+
+ /*
+ * String STR_SCHEMA =
+ * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+ * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+ */
+ // Starting read , simple projection 1 , record of record level, FIXED,
+ // PASS on advance
+ String projection1 = new String("r2.r3.f4, r2.r3.f3");
+ reader.setProjection(projection1);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("r3 row1 byte array", RowValue.get(0).toString());
+ Assert.assertEquals(1.3, RowValue.get(1));
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("r3 row2 byte array", RowValue.get(0).toString());
+ Assert.assertEquals(2.3, RowValue.get(1));
+
+ /*
+ * String STR_SCHEMA =
+ * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+ * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+ */
+ // test stitch, [r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]
+ String projection2 = new String("r1.f1, r2.r3.f3");
+ reader.setProjection(projection2);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(1, RowValue.get(0));
+ Assert.assertEquals(1.3, RowValue.get(1));
+
+ scanner.advance();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(2, RowValue.get(0));
+ Assert.assertEquals(2.3, RowValue.get(1));
+
+ /*
+ * String STR_SCHEMA =
+ * "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))"; String
+ * STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
+ */
+ // test stitch, [r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]
+ String projection3 = new String("r1, r2");
+ reader.setProjection(projection3);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+
+ Assert.assertEquals(1, ((Tuple) RowValue.get(0)).get(0));
+ Assert.assertEquals(1001L, ((Tuple) RowValue.get(0)).get(1));
+ Assert.assertEquals("((1.3,r3 row1 byte array))", RowValue.get(1)
+ .toString());
+ Assert.assertEquals(1.3, ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(0));
+ Assert.assertEquals("r3 row1 byte array",
+ ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
+
+ scanner.advance();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(2, ((Tuple) RowValue.get(0)).get(0));
+ Assert.assertEquals(1002L, ((Tuple) RowValue.get(0)).get(1));
+ Assert.assertEquals("((2.3,r3 row2 byte array))", RowValue.get(1)
+ .toString());
+ Assert.assertEquals(2.3, ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(0));
+ Assert.assertEquals("r3 row2 byte array",
+ ((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
+
+ // test projection, negative, none-exist column name. FAILED, It crashes
+ // here
+ String projection4 = new String("r3");
+ reader.setProjection(projection4);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ // scanner.getValue(RowValue); //FAILED, It crashes here
+
+ reader.close();
+
+ }
+
+ @Test
+ public void testMap() throws IOException, ParseException {
+ String STR_SCHEMA = "m1:map(string),m2:map(map(int))";
+ String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}]";
+
+ // String STR_SCHEMA = "m1:map(string)";
+ // String STR_STORAGE = "[m1#{a}]";
+ // Build Table and column groups
+ path = new Path(getCurrentMethodName());
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ writer.finish();
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ // m1:map(string)
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a", "A");
+ m1.put("b", "B");
+ m1.put("c", "C");
+ tuple.set(0, m1);
+
+ // m2:map(map(int))
+ HashMap<String, Map> m2 = new HashMap<String, Map>();
+ Map<String, Integer> m3 = new HashMap<String, Integer>();
+ m3.put("m311", 311);
+ m3.put("m321", 321);
+ m3.put("m331", 331);
+ Map<String, Integer> m4 = new HashMap<String, Integer>();
+ m4.put("m411", 411);
+ m4.put("m421", 421);
+ m4.put("m431", 431);
+ m2.put("x", m3);
+ m2.put("y", m4);
+ tuple.set(1, m2);
+ int row = 0;
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // row 2
+ row++;
+ TypesUtils.resetTuple(tuple);
+ m1.clear();
+ m2.clear();
+ m3.clear();
+ m4.clear();
+ // m1:map(string)
+ m1.put("a", "A2");
+ m1.put("b2", "B2");
+ m1.put("c2", "C2");
+ tuple.set(0, m1);
+
+ // m2:map(map(int))
+ m3.put("m321", 321);
+ m3.put("m322", 322);
+ m3.put("m323", 323);
+ m2.put("z", m3);
+ tuple.set(1, m2);
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // finish building table, closing out the inserter, writer, writer1
+ inserter.close();
+ writer1.finish();
+ writer.close();
+
+ // Starting read Simple read one map PASS
+ String projection = new String("m1#{a}");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ reader.close();
+ reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("{a=A}", RowValue.get(0).toString());
+
+ // If row 1 has a==>A, and row 2 doesn't have key a, scanner advance
+ // will still return a==>A
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println(RowValue.get(0).toString());
+ Assert.assertEquals("{a=A2}", RowValue.get(0).toString());
+
+ // m1:map(string),m2:map(map(int))";
+ // String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}]";
+
+ // test stitch, I can re_use reader, split, but NOT scanner
+ String projection2 = new String("m1#{b}, m2#{x|z}");
+ reader.setProjection(projection2);
+ scanner = reader.getScanner(splits.get(0), true);
+ RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ // TODO Assert.assertEquals("{a=A}", RowValue.get(0).toString());
+ Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b"));
+ Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("x"))
+ .get("m321"));
+ Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("z")));
+ // m2's x key:
+ /*
+ * m3.put("m311", 311); m3.put("m321", 321); m3.put("m331", 331);
+ */
+ System.out.println(RowValue.get(1).toString());
+
+ TypesUtils.resetTuple(RowValue);
+ scanner.advance();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b"));
+ Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x")));
+ Assert.assertEquals(323, ((Map) ((Map) RowValue.get(1)).get("z"))
+ .get("m323"));
+
+ reader.close();
+
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * Test projections on complicated column types.
+ *
+ */
+public class TestSimple {
+
+ final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes";
+ final static String STR_STORAGE = "[s1, s2]; [s3, s4]; [s5, s6]";
+ private static Configuration conf;
+ private static Path path;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), "TestSimple");
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ writer.finish();
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ // insert data in row 1
+ int row = 0;
+ tuple.set(0, true); // bool
+ tuple.set(1, 1); // int
+ tuple.set(2, 1001L); // long
+ tuple.set(3, 1.1); // float
+ tuple.set(4, "hello world 1"); // string
+ tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // insert data in row 2
+ row++;
+ tuple.set(0, false);
+ tuple.set(1, 2); // int
+ tuple.set(2, 1002L); // long
+ tuple.set(3, 3.1); // float
+ tuple.set(4, "hello world 2"); // string
+ tuple.set(5, new DataByteArray("hello byte 2")); // byte
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // finish building table, closing out the inserter, writer, writer1
+ inserter.close();
+ writer1.finish();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ BasicTable.drop(path, conf);
+ }
+
+ // Test simple projection
+ @Test
+ public void testReadSimple1() throws IOException, ParseException {
+ String projection = new String("s6,s5,s4,s3,s2,s1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(true, RowValue.get(5));
+ Assert.assertEquals(1, RowValue.get(4));
+ Assert.assertEquals(1001L, RowValue.get(3));
+ Assert.assertEquals(1.1, RowValue.get(2));
+ Assert.assertEquals("hello world 1", RowValue.get(1));
+ Assert.assertEquals("hello byte 1", RowValue.get(0).toString());
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals(false, RowValue.get(5));
+ Assert.assertEquals(2, RowValue.get(4));
+ Assert.assertEquals(1002L, RowValue.get(3));
+ Assert.assertEquals(3.1, RowValue.get(2));
+ Assert.assertEquals("hello world 2", RowValue.get(1));
+ Assert.assertEquals("hello byte 2", RowValue.get(0).toString());
+ }
+
+ // test stitch,
+ @Test
+ public void testReadSimpleStitch() throws IOException, ParseException {
+ String projection2 = new String("s5, s1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection2);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("hello world 1", RowValue.get(0));
+ Assert.assertEquals(true, RowValue.get(1));
+
+ scanner.advance();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(RowValue);
+ Assert.assertEquals("hello world 2", RowValue.get(0));
+ Assert.assertEquals(false, RowValue.get(1));
+
+ reader.close();
+ }
+
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * Test projections on complicated column types.
+ *
+ */
+public class TestWrite {
+ final static String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)), m1:map(string),m2:map(map(int)), c:collection(f13:double, f14:float, f15:bytes)";
+ final static String STR_STORAGE = "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}]; [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
+ private static Configuration conf;
+ private static Path path;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), "TestWrite");
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ writer.finish();
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ Tuple tupRecord1;
+ try {
+ tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupRecord2;
+ try {
+ tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupRecord3;
+ try {
+ tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ // row 1
+ tuple.set(0, true); // bool
+ tuple.set(1, 1); // int
+ tuple.set(2, 1001L); // long
+ tuple.set(3, 1.1); // float
+ tuple.set(4, "hello world 1"); // string
+ tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 1);
+ tupRecord1.set(1, 1001L);
+ tuple.set(6, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 1.3);
+ tupRecord3.set(1, new DataByteArray("r3 row 1 byte array "));
+ tuple.set(7, tupRecord2);
+
+ // m1:map(string)
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a", "A");
+ m1.put("b", "B");
+ m1.put("c", "C");
+ tuple.set(8, m1);
+
+ // m2:map(map(int))
+ HashMap<String, Map> m2 = new HashMap<String, Map>();
+ Map<String, Integer> m3 = new HashMap<String, Integer>();
+ m3.put("m311", 311);
+ m3.put("m321", 321);
+ m3.put("m331", 331);
+ Map<String, Integer> m4 = new HashMap<String, Integer>();
+ m4.put("m411", 411);
+ m4.put("m421", 421);
+ m4.put("m431", 431);
+ m2.put("x", m3);
+ m2.put("y", m4);
+ tuple.set(9, m2);
+
+ // c:collection(f13:double, f14:float, f15:bytes)
+ DataBag bagColl = TypesUtils.createBag();
+ Schema schColl = schema.getColumn(10).getSchema();
+ Tuple tupColl1 = TypesUtils.createTuple(schColl);
+ Tuple tupColl2 = TypesUtils.createTuple(schColl);
+ byte[] abs1 = new byte[3];
+ byte[] abs2 = new byte[4];
+ tupColl1.set(0, 3.1415926);
+ tupColl1.set(1, 1.6);
+ abs1[0] = 11;
+ abs1[1] = 12;
+ abs1[2] = 13;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bagColl.add(tupColl1);
+ tupColl2.set(0, 123.456789);
+ tupColl2.set(1, 100);
+ abs2[0] = 21;
+ abs2[1] = 22;
+ abs2[2] = 23;
+ abs2[3] = 24;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bagColl.add(tupColl2);
+ tuple.set(10, bagColl);
+
+ int row = 0;
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ // row 2
+ row++;
+ TypesUtils.resetTuple(tuple);
+ TypesUtils.resetTuple(tupRecord1);
+ TypesUtils.resetTuple(tupRecord2);
+ TypesUtils.resetTuple(tupRecord3);
+ m1.clear();
+ m2.clear();
+ m3.clear();
+ m4.clear();
+ tuple.set(0, false);
+ tuple.set(1, 2); // int
+ tuple.set(2, 1002L); // long
+ tuple.set(3, 3.1); // float
+ tuple.set(4, "hello world 2"); // string
+ tuple.set(5, new DataByteArray("hello byte 2")); // byte
+
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 2);
+ tupRecord1.set(1, 1002L);
+ tuple.set(6, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 2.3);
+ tupRecord3.set(1, new DataByteArray("r3 row2 byte array"));
+ tuple.set(7, tupRecord2);
+
+ // m1:map(string)
+ m1.put("a2", "A2");
+ m1.put("b2", "B2");
+ m1.put("c2", "C2");
+ tuple.set(8, m1);
+
+ // m2:map(map(int))
+ m3.put("m321", 321);
+ m3.put("m322", 322);
+ m3.put("m323", 323);
+ m2.put("z", m3);
+ tuple.set(9, m2);
+
+ // c:collection(f13:double, f14:float, f15:bytes)
+ bagColl.clear();
+ TypesUtils.resetTuple(tupColl1);
+ TypesUtils.resetTuple(tupColl2);
+ tupColl1.set(0, 7654.321);
+ tupColl1.set(1, 0.0001);
+ abs1[0] = 31;
+ abs1[1] = 32;
+ abs1[2] = 33;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bagColl.add(tupColl1);
+ tupColl2.set(0, 0.123456789);
+ tupColl2.set(1, 0.3333);
+ abs2[0] = 41;
+ abs2[1] = 42;
+ abs2[2] = 43;
+ abs2[3] = 44;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bagColl.add(tupColl2);
+ tuple.set(10, bagColl);
+
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ inserter.close();
+ writer1.finish();
+
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ }
+
+ @Test
+ public void test1() throws IOException, ParseException {
+ String projection = new String("r1.f2, s1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ // long totalBytes = reader.getStatus().getSize();
+
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ reader.close();
+ reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(value);
+
+ Assert.assertEquals(1001L, value.get(0));
+ Assert.assertEquals(true, value.get(1));
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ scanner.getValue(value);
+ Assert.assertEquals(1002L, value.get(0));
+ Assert.assertEquals(false, value.get(1));
+
+ reader.close();
+ }
+
+ @Test
+ public void testStitch() throws IOException, ParseException {
+ String projection = new String("s1, r1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ // long totalBytes = reader.getStatus().getSize();
+
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ reader.close();
+ reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(value);
+
+ Tuple recordTuple = (Tuple) value.get(1);
+ Assert.assertEquals(1, recordTuple.get(0));
+ Assert.assertEquals(1001L, recordTuple.get(1));
+ Assert.assertEquals(true, value.get(0));
+ reader.close();
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Flat;
+
+/**
+ * Generate some input text files.
+ */
+class ArticleGenerator {
+ Random random;
+ Dictionary dict;
+ int pageWidth;
+ DiscreteRNG lastLineLenGen;
+ DiscreteRNG paragraphLineLenGen;
+ DiscreteRNG paragraphLenGen;
+ long wordCount;
+ long lineCount;
+
+ /**
+ * Create an article generator.
+ *
+ * @param dictWordCnt
+ * Number of words in the dictionary.
+ * @param minWordLen
+ * Minimum word length
+ * @param maxWordLen
+ * Maximum word length
+ * @param lineWidth
+ * Line width.
+ */
+ ArticleGenerator(int dictWordCnt, int minWordLen, int maxWordLen,
+ int pageWidth) {
+ random = new Random(System.nanoTime());
+ dict = new Dictionary(random, dictWordCnt, minWordLen, maxWordLen, 100);
+ this.pageWidth = pageWidth;
+ lastLineLenGen = new Flat(random, 1, pageWidth);
+ paragraphLineLenGen = new Flat(random, pageWidth * 3 / 4, pageWidth);
+ paragraphLenGen = new Flat(random, 1, 40);
+ }
+
+ /**
+ * Create an article
+ *
+ * @param fs
+ * File system.
+ * @param path
+ * path of the file
+ * @param length
+ * Expected size of the file.
+ * @throws IOException
+ */
+ void createArticle(FileSystem fs, Path path, long length) throws IOException {
+ FSDataOutputStream fsdos = fs.create(path, false);
+ StringBuilder sb = new StringBuilder();
+ int remainLinesInParagraph = paragraphLenGen.nextInt();
+ while (fsdos.getPos() < length) {
+ if (remainLinesInParagraph == 0) {
+ remainLinesInParagraph = paragraphLenGen.nextInt();
+ fsdos.write('\n');
+ }
+ int lineLen = paragraphLineLenGen.nextInt();
+ if (--remainLinesInParagraph == 0) {
+ lineLen = lastLineLenGen.nextInt();
+ }
+ sb.setLength(0);
+ while (sb.length() < lineLen) {
+ if (sb.length() > 0) {
+ sb.append(' ');
+ }
+ sb.append(dict.nextWord());
+ ++wordCount;
+ }
+ sb.append('\n');
+ fsdos.write(sb.toString().getBytes());
+ ++lineCount;
+ }
+ fsdos.close();
+ }
+
+ /**
+ * Create a bunch of files under the same directory.
+ *
+ * @param fs
+ * File system
+ * @param parent
+ * directory where files should be created
+ * @param prefix
+ * prefix name of the files
+ * @param n
+ * total number of files
+ * @param length
+ * length of each file.
+ * @throws IOException
+ */
+ void batchArticalCreation(FileSystem fs, Path parent, String prefix, int n,
+ long length) throws IOException {
+ for (int i = 0; i < n; ++i) {
+ createArticle(fs, new Path(parent, String.format("%s%06d", prefix, i)),
+ length);
+ }
+ }
+
+ static class Summary {
+ long wordCount;
+ long lineCount;
+ Map<String, Long> wordCntDist;
+
+ Summary() {
+ wordCntDist = new HashMap<String, Long>();
+ }
+ }
+
+ void resetSummary() {
+ wordCount = 0;
+ lineCount = 0;
+ dict.resetWordCnts();
+ }
+
+ Summary getSummary() {
+ Summary ret = new Summary();
+ ret.wordCount = wordCount;
+ ret.lineCount = lineCount;
+ ret.wordCntDist = dict.getWordCounts();
+ return ret;
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Binomial;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.Zipf;
+
+/**
+ * A dictionary that generates English words, whose frequency follows Zipf
+ * distributions, and length follows Binomial distribution.
+ */
+class Dictionary {
+ private static final double BINOMIAL_P = 0.3;
+ private static final double SIGMA = 1.1;
+ private final int lead;
+ private final Zipf zipf;
+ private final String[] dict;
+ private final long[] wordCnts;
+
+ private static String makeWord(DiscreteRNG rng, Random random) {
+ int len = rng.nextInt();
+ StringBuilder sb = new StringBuilder(len);
+ for (int i = 0; i < len; ++i) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Constructor
+ *
+ * @param entries
+ * How many words exist in the dictionary.
+ * @param minWordLen
+ * Minimum word length.
+ * @param maxWordLen
+ * Maximum word length.
+ * @param freqRatio
+ * Expected ratio between the most frequent words and the least
+ * frequent words. (e.g. 100)
+ */
+ public Dictionary(Random random, int entries, int minWordLen, int maxWordLen,
+ int freqRatio) {
+ Binomial binomial = new Binomial(random, minWordLen, maxWordLen, BINOMIAL_P);
+ lead = Math.max(0,
+ (int) (entries / (Math.exp(Math.log(freqRatio) / SIGMA) - 1)) - 1);
+ zipf = new Zipf(random, lead, entries + lead, 1.1);
+ dict = new String[entries];
+ // Use a set to ensure no dup words in dictionary
+ Set<String> dictTmp = new HashSet<String>();
+ for (int i = 0; i < entries; ++i) {
+ while (true) {
+ String word = makeWord(binomial, random);
+ if (!dictTmp.contains(word)) {
+ dictTmp.add(word);
+ dict[i] = word;
+ break;
+ }
+ }
+ }
+ wordCnts = new long[dict.length];
+ }
+
+ /**
+ * Get the next word from the dictionary.
+ *
+ * @return The next word from the dictionary.
+ */
+ public String nextWord() {
+ int index = zipf.nextInt() - lead;
+ ++wordCnts[index];
+ return dict[index];
+ }
+
+ public void resetWordCnts() {
+ for (int i = 0; i < wordCnts.length; ++i) {
+ wordCnts[i] = 0;
+ }
+ }
+
+ public Map<String, Long> getWordCounts() {
+ Map<String, Long> ret = new HashMap<String, Long>();
+ for (int i = 0; i < dict.length; ++i) {
+ if (wordCnts[i] > 0) {
+ ret.put(dict[i], wordCnts[i]);
+ }
+ }
+ return ret;
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ *
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * this 2
+ * is 1
+ * a 4
+ * test 2
+ * hello 1
+ * world 3
+ * </pre>
+ *
+ */
+public class TableMRSample {
+ static class MapClass implements
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+
+ // value should contain "word count"
+ String[] wdct = value.toString().split(" ");
+ if (wdct.length != 2) {
+ // LOG the error
+ return;
+ }
+
+ byte[] word = wdct[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+ output.collect(bytesKey, tupleRow);
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+
+ public static void main(String[] args) throws ParseException, IOException {
+ JobConf jobConf = new JobConf();
+ jobConf.setJobName("tableMRSample");
+ jobConf.set("table.output.tfile.compression", "gz");
+
+ // input settings
+ jobConf.setInputFormat(TextInputFormat.class);
+ jobConf.setMapperClass(TableMRSample.MapClass.class);
+ FileInputFormat.setInputPaths(jobConf, new Path(
+ "/user/joe/inputdata/input.txt"));
+ jobConf.setNumMapTasks(2);
+
+ // output settings
+ Path outPath = new Path("/user/joe/outputdata/");
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, outPath);
+ // set the logical schema with 2 columns
+ BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+ // for demo purposes, create 2 physical column groups
+ BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+ // set map-only job.
+ jobConf.setNumReduceTasks(0);
+ JobClient.runJob(jobConf);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample1.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ *
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * this 2
+ * is 1
+ * a 4
+ * test 2
+ * hello 1
+ * world 3
+ * </pre>
+ *
+ */
+public class TableMRSample1 {
+ static class MapClass implements
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+
+ // value should contain "word count"
+ String[] wdct = value.toString().split(" ");
+ if (wdct.length != 2) {
+ // LOG the error
+ throw new IOException("Does not contain two fields");
+ }
+
+ byte[] word = wdct[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+ output.collect(bytesKey, tupleRow);
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ }
+
+ public static void main(String[] args) throws ParseException, IOException {
+ JobConf jobConf = new JobConf();
+ jobConf.setJobName("tableMRSample");
+ jobConf.set("table.output.tfile.compression", "gz");
+
+ // input settings
+ jobConf.setInputFormat(TextInputFormat.class);
+ jobConf.setMapperClass(TableMRSample1.MapClass.class);
+ FileInputFormat.setInputPaths(jobConf, new Path("/user/joe/input.txt"));
+ jobConf.setNumMapTasks(2);
+
+ // output settings
+ Path outPath = new Path("/user/joe/tableout");
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, outPath);
+ // set the logical schema with 2 columns
+ BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+ // for demo purposes, create 2 physical column groups
+ BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+ // set map-only job.
+ jobConf.setNumReduceTasks(0);
+ JobClient.runJob(jobConf);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample to show using zebra table to do a simple basic union in
+ * map/reduce * To run this, we need have two basic tables ready. They contain
+ * the data as in Sample 1, i.e., (word, count). In this example, they are at:
+ * /homes/chaow/mapredu/t1 /homes/chaow/mapredu/t2 The resulting table is put
+ * at: /homes/chaow/mapredu2/t1
+ *
+ */
+public class TableMRSample2 {
+ static class MapClass implements
+ Mapper<BytesWritable, Tuple, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ @Override
+ public void map(BytesWritable key, Tuple value,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException
+
+ {
+ System.out.println(key.toString() + value.toString());
+ output.collect(key, value);
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ public static void main(String[] args) throws ParseException, IOException {
+ JobConf jobConf = new JobConf();
+ jobConf.setJobName("tableMRSample");
+ jobConf.set("table.output.tfile.compression", "gz");
+
+ // input settings
+ jobConf.setInputFormat(TableInputFormat.class);
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ jobConf.setMapperClass(TableMRSample2.MapClass.class);
+
+ List<Path> paths = new ArrayList<Path>(2);
+ Path p = new Path("/homes/chaow/mapredu/t1");
+ System.out.println("path = " + p);
+ paths.add(p);
+ p = new Path("/homes/chaow/mapredu/t2");
+ paths.add(p);
+
+ TableInputFormat.setInputPaths(jobConf, paths.toArray(new Path[2]));
+ TableInputFormat.setProjection(jobConf, "word");
+ BasicTableOutputFormat.setOutputPath(jobConf, new Path(
+ "/homes/chaow/mapredu2/t1"));
+
+ BasicTableOutputFormat.setSchema(jobConf, "word:string");
+ BasicTableOutputFormat.setStorageHint(jobConf, "[word]");
+
+ // set map-only job.
+ jobConf.setNumReduceTasks(0);
+ jobConf.setNumMapTasks(2);
+ JobClient.runJob(jobConf);
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>TableMapReduceExample<code> is a map-reduce example for Table Input/Output Format.
+ * <p/>
+ * Schema for Table is set to two columns containing Word of type <i>string</i> and Count of type <i>int</i> using <code> BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int"); </code>
+ * <p/>
+ * Hint for creation of Column Groups is specified using
+ * <code> BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]"); </code>
+ * . Here we have two column groups.
+ * <p/>
+ * Input file should contain rows of word and count, separated by a space. For
+ * example:
+ *
+ * <pre>
+ * this 2
+ * is 1
+ * a 5
+ * test 2
+ * hello 1
+ * world 3
+ * </pre>
+ * <p/>
+ * <p>
+ * Second job reads output from the first job which is in Table Format. Here we
+ * specify <i>count</i> as projection column. Table Input Format projects in put
+ * row which has both word and count into a row containing only the count column
+ * and hands it to map.
+ * <p/>
+ * Reducer sums the counts and produces a sum of counts which should match total
+ * number of words in original text.
+ */
+
+public class TableMapReduceExample extends Configured implements Tool {
+
+ static class Map extends MapReduceBase implements
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ /**
+ * Map method for reading input.
+ */
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+
+ // value should contain "word count"
+ String[] wordCount = value.toString().split(" ");
+ if (wordCount.length != 2) {
+ // LOG the error
+ throw new IOException("Value does not contain two fields:" + value);
+ }
+
+ byte[] word = wordCount[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wordCount[1]));
+
+ output.collect(bytesKey, tupleRow);
+ }
+
+ /**
+ * Configuration of the job. Here we create an empty Tuple Row.
+ */
+ @Override
+ public void configure(JobConf job) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ static class ProjectionMap extends MapReduceBase implements
+ Mapper<BytesWritable, Tuple, Text, IntWritable> {
+ private final static Text all = new Text("All");
+
+ /**
+ * Map method which gets count column after projection.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void map(BytesWritable key, Tuple value,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ output.collect(all, new IntWritable((Integer) value.get(0)));
+ }
+ }
+
+ public static class ProjectionReduce extends MapReduceBase implements
+ Reducer<Text, IntWritable, Text, IntWritable> {
+ /**
+ * Reduce method which implements summation. Acts as both reducer and
+ * combiner.
+ *
+ * @throws IOException
+ */
+ public void reduce(Text key, Iterator<IntWritable> values,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ /**
+ * Where jobs and their settings and sequence is set.
+ *
+ * @param args
+ * arguments with exception of Tools understandable ones.
+ */
+ public int run(String[] args) throws Exception {
+ if (args == null || args.length != 3) {
+ System.out
+ .println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
+ System.exit(-1);
+ }
+
+ /*
+ * First MR Job creating a Table with two columns
+ */
+ JobConf jobConf = new JobConf();
+ jobConf.setJobName("TableMapReduceExample");
+ jobConf.set("table.output.tfile.compression", "none");
+
+ // Input settings
+ jobConf.setInputFormat(TextInputFormat.class);
+ jobConf.setMapperClass(Map.class);
+ FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
+
+ // Output settings
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, new Path(args[1]));
+
+ // set the logical schema with 2 columns
+ BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+
+ // for demo purposes, create 2 physical column groups
+ BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+
+ // set map-only job.
+ jobConf.setNumReduceTasks(0);
+
+ // Run Job
+ JobClient.runJob(jobConf);
+
+ /*
+ * Second MR Job for Table Projection of count column
+ */
+ JobConf projectionJobConf = new JobConf();
+ projectionJobConf.setJobName("TableProjectionMapReduceExample");
+
+ // Input settings
+ projectionJobConf.setMapperClass(ProjectionMap.class);
+ projectionJobConf.setInputFormat(TableInputFormat.class);
+ TableInputFormat.setProjection(projectionJobConf, "count");
+ TableInputFormat.setInputPaths(projectionJobConf, new Path(args[1]));
+ projectionJobConf.setMapOutputKeyClass(Text.class);
+ projectionJobConf.setMapOutputValueClass(IntWritable.class);
+
+ // Output settings
+ projectionJobConf.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(projectionJobConf, new Path(args[2]));
+ projectionJobConf.setReducerClass(ProjectionReduce.class);
+ projectionJobConf.setCombinerClass(ProjectionReduce.class);
+
+ // Run Job
+ JobClient.runJob(projectionJobConf);
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(),
+ args);
+ System.exit(res);
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFMoreLocal.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Testing BasicTableOutputFormat and TableInputFormat using LocalFS with larger
+ * settings.
+ */
+public class TestBasicTableIOFMoreLocal extends TestBasicTableIOFormatLocalFS {
+ @Override
+ protected void setUp() throws IOException {
+ LOG = LogFactory.getLog(TestBasicTableIOFMoreLocal.class.getName());
+
+ if (options == null) {
+ options = new Options();
+ options.srcFiles = 20;
+ options.numBatches = 1;
+ options.numMapper = 5;
+ options.numReducer = 4;
+ }
+
+ super.setUp();
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatDFS.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Testing BasicTableOutputFormat and TableInputFormat using DFS
+ */
+public class TestBasicTableIOFormatDFS extends TestBasicTableIOFormatLocalFS {
+ @Override
+ protected void setUp() throws IOException {
+ LOG = LogFactory.getLog(TestBasicTableIOFormatDFS.class.getName());
+
+ if (options == null) {
+ options = new Options();
+ options.localFS = false;
+ }
+
+ super.setUp();
+ }
+}