You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC
svn commit: r803312 [9/16] - in /hadoop/pig/trunk: ./ contrib/zebra/
contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/
contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/
contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/s...
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,1021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * Test projections on complicated column types.
+ *
+ */
+public class TestCollection {
+
+ final static String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ final static String STR_STORAGE = "[c]";
+ private static Configuration conf;
+ private static Path path;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), "TestCollection");
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+
+ writer.finish();
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+ DataBag bag1 = TypesUtils.createBag();
+ Schema schColl = schema.getColumn(0).getSchema();
+ Tuple tupColl1 = TypesUtils.createTuple(schColl);
+ Tuple tupColl2 = TypesUtils.createTuple(schColl);
+
+ DataBag bag2 = TypesUtils.createBag();
+ Schema schColl2 = schema.getColumn(1).getSchema();
+ Tuple tupColl2_1 = TypesUtils.createTuple(schColl2);
+ Tuple tupColl2_2 = TypesUtils.createTuple(schColl2);
+ Tuple collRecord1;
+ try {
+ collRecord1 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple collRecord2;
+ try {
+ collRecord2 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ // c3:collection(c3_1:collection(e:int,f:bool))
+ DataBag bag3 = TypesUtils.createBag();
+ Schema schColl3 = schema.getColumn(2).getSchema();
+ DataBag bag3_1 = TypesUtils.createBag();
+ DataBag bag3_2 = TypesUtils.createBag();
+
+ Tuple tupColl3_1 = null;
+ try {
+ tupColl3_1 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple tupColl3_2;
+ try {
+ tupColl3_2 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupColl3_3 = null;
+ try {
+ tupColl3_3 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple tupColl3_4;
+ try {
+ tupColl3_4 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ byte[] abs1 = new byte[3];
+ byte[] abs2 = new byte[4];
+ tupColl1.set(0, 3.1415926);
+ tupColl1.set(1, 1.6);
+ abs1[0] = 11;
+ abs1[1] = 12;
+ abs1[2] = 13;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bag1.add(tupColl1);
+ tupColl2.set(0, 123.456789);
+ tupColl2.set(1, 100);
+ abs2[0] = 21;
+ abs2[1] = 22;
+ abs2[2] = 23;
+ abs2[3] = 24;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bag1.add(tupColl2);
+ tuple.set(0, bag1);
+
+ collRecord1.set(0, 1);
+ collRecord1.set(1, "record1_string1");
+ tupColl2_1.set(0, collRecord1);
+ tupColl2_1.set(1, "hello1");
+ bag2.add(tupColl2_1);
+
+ collRecord2.set(0, 2);
+ collRecord2.set(1, "record2_string1");
+ tupColl2_2.set(0, collRecord2);
+ tupColl2_2.set(1, "hello2");
+ bag2.add(tupColl2_2);
+ tuple.set(1, bag2);
+
+ TypesUtils.resetTuple(tupColl3_1);
+ TypesUtils.resetTuple(tupColl3_2);
+ tupColl3_1.set(0, 1);
+ tupColl3_1.set(1, true);
+ tupColl3_2.set(0, 2);
+ tupColl3_2.set(1, false);
+ bag3_1.add(tupColl3_1);
+ bag3_1.add(tupColl3_2);
+ bag3.addAll(bag3_1);
+
+ tupColl3_3.set(0, 3);
+ tupColl3_3.set(1, true);
+ tupColl3_4.set(0, 4);
+ tupColl3_4.set(1, false);
+ bag3_2.add(tupColl3_3);
+ bag3_2.add(tupColl3_4);
+ bag3.addAll(bag3_2);
+ tuple.set(2, bag3);
+
+ int row = 0;
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ row++;
+
+ bag1.clear();
+ bag2.clear();
+ bag3.clear();
+ bag3_1.clear();
+ bag3_2.clear();
+ TypesUtils.resetTuple(tupColl1);
+ TypesUtils.resetTuple(tupColl2);
+ TypesUtils.resetTuple(tupColl2_1);
+ TypesUtils.resetTuple(tupColl2_2);
+ TypesUtils.resetTuple(collRecord1);
+ TypesUtils.resetTuple(collRecord2);
+ TypesUtils.resetTuple(tupColl3_1);
+ TypesUtils.resetTuple(tupColl3_2);
+ TypesUtils.resetTuple(tupColl3_3);
+ TypesUtils.resetTuple(tupColl3_4);
+
+ tupColl1.set(0, 7654.321);
+ tupColl1.set(1, 0.0001);
+ abs1[0] = 31;
+ abs1[1] = 32;
+ abs1[2] = 33;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bag1.add(tupColl1);
+ tupColl2.set(0, 0.123456789);
+ tupColl2.set(1, 0.3333);
+ abs2[0] = 41;
+ abs2[1] = 42;
+ abs2[2] = 43;
+ abs2[3] = 44;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bag1.add(tupColl2);
+ tuple.set(0, bag1);
+
+ collRecord1.set(0, 3);
+ collRecord1.set(1, "record1_string2");
+ tupColl2_1.set(0, collRecord1);
+ tupColl2_1.set(1, "hello1_2");
+ bag2.add(tupColl2_1);
+
+ collRecord2.set(0, 4);
+ collRecord2.set(1, "record2_string2");
+ tupColl2_2.set(0, collRecord2);
+ tupColl2_2.set(1, "hello2_2");
+ bag2.add(tupColl2_2);
+ tuple.set(1, bag2);
+
+ tupColl3_1.set(0, 5);
+ tupColl3_1.set(1, true);
+ tupColl3_2.set(0, 6);
+ tupColl3_2.set(1, false);
+ bag3_1.add(tupColl3_1);
+ bag3_1.add(tupColl3_2);
+ bag3.addAll(bag3_1);
+
+ tupColl3_3.set(0, 7);
+ tupColl3_3.set(1, true);
+ tupColl3_4.set(0, 8);
+ tupColl3_4.set(1, false);
+ bag3_2.add(tupColl3_3);
+ bag3_2.add(tupColl3_4);
+ bag3.addAll(bag3_2);
+ tuple.set(2, bag3);
+
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+
+ inserter.close();
+ writer1.finish();
+
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ BasicTable.drop(path, conf);
+ }
+
+ // read one collection
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ @Test
+ public void testRead1() throws IOException, ParseException {
+ String projection = new String("c");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("test read 1: row: " + RowValue.toString());
+ // test read 1: row: ({(3.1415926,1.6, ),(123.456789,100,)})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(3.1415926, cur.get(0));
+ Assert.assertEquals(1.6, cur.get(1));
+ System.out
+ .println("byte 0: " + ((DataByteArray) cur.get(2)).toString());
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(123.456789, cur.get(0));
+ Assert.assertEquals(100, cur.get(1));
+ // Assert.assertEquals(3.1415926, cur.get(2));
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(7654.321, cur.get(0));
+ Assert.assertEquals(0.0001, cur.get(1));
+ // Assert.assertEquals(3.1415926, cur.get(2));
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(0.123456789, cur.get(0));
+ Assert.assertEquals(0.3333, cur.get(1));
+ // Assert.assertEquals(3.1415926, cur.get(2));
+ }
+ }
+
+ reader.close();
+ }
+
+ // read second collection
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ @Test
+ public void testRead2() throws IOException, ParseException {
+ String projection = new String("c2");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("test read 2:row: " + RowValue.toString());
+ // test read 2:row:
+ // ({((1,record1_string1),hello1),((2,record2_string1),hello2)})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0)); // (1,record1_string1)
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+ Assert.assertEquals("hello1", cur.get(1));
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+ Assert.assertEquals("hello2", cur.get(1));
+
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+ Assert.assertEquals("hello1_2", cur.get(1));
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+ Assert.assertEquals("hello2_2", cur.get(1));
+ }
+ }
+
+ reader.close();
+ }
+
+ // read 3rd column
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ @Test
+ public void testRead3() throws IOException, ParseException {
+ String projection = new String("c3");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("test record 3: row: " + RowValue.toString());
+ // test record 3: row: ({(1,true),(2,false),(3,true),(4,false)})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0)); // 3
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, cur.get(0));
+ Assert.assertEquals(true, cur.get(1));
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, cur.get(0));
+ Assert.assertEquals(false, cur.get(1));
+ }
+ if (list == 3) {
+ Assert.assertEquals(3, cur.get(0));
+ Assert.assertEquals(true, cur.get(1));
+ }
+ if (list == 4) {
+ Assert.assertEquals(4, cur.get(0));
+ Assert.assertEquals(false, cur.get(1));
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ System.out.println("row: " + RowValue.toString());
+ // row: ({(5,true),(6,false),(7,true),(8,false)})
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(5, cur.get(0));
+ Assert.assertEquals(true, cur.get(1));
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(6, cur.get(0));
+ Assert.assertEquals(false, cur.get(1));
+ }
+ if (list2 == 3) {
+ Assert.assertEquals(7, cur.get(0));
+ Assert.assertEquals(true, cur.get(1));
+ }
+ if (list2 == 4) {
+ Assert.assertEquals(8, cur.get(0));
+ Assert.assertEquals(false, cur.get(1));
+ }
+ }
+
+ reader.close();
+ }
+
+ // Negative none exist column
+ @Test
+ public void xtestReadNeg1() throws IOException, ParseException {
+ String projection = new String("d");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("row: " + RowValue.toString());
+ System.out.println("row1: " + RowValue.get(0));
+ Assert.assertEquals(false, RowValue.isNull());
+ Assert.assertEquals(null, RowValue.get(0));
+ Assert.assertEquals(1, RowValue.size());
+ reader.close();
+ }
+
+ // read, should support project to 2nd level
+ @Test
+ public void testRead5() throws IOException, ParseException {
+ String projection = new String("c.a");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("test read 5: row: " + RowValue.toString());
+ // test read 5: row: ({(3.1415926),(123.456789)})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(3.1415926, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(123.456789, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(7654.321, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(0.123456789, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ reader.close();
+ }
+
+ // read, should support project to 2nd level
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ @Test
+ public void testRead6() throws IOException, ParseException {
+ String projection = new String("c2.r1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("test read 6 :row: " + RowValue.toString());
+ // test read 6 :row: ({((1,record1_string1)),((2,record2_string1))})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ reader.close();
+ }
+
+ // read, should support project to 3rd level TODO: construct scanner failed
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+
+ public void xtestRead7() throws IOException, ParseException {
+ String projection = new String("c2.r1.f1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ scanner = reader.getScanner(splits.get(0), true);
+
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("row: " + RowValue.toString());
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ reader.close();
+ }
+
+ // read, should support project to 3rd level TODO: construct scanner failed
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+
+ public void xtestRead8() throws IOException, ParseException {
+ String projection = new String("c3.c3_1.e");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = null;
+ scanner = reader.getScanner(splits.get(0), true);
+
+ scanner = reader.getScanner(splits.get(0), true);
+
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("row: " + RowValue.toString());
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(0)).iterator();
+ int list2 = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list2++;
+ if (list2 == 1) {
+ Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list2 == 2) {
+ Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ reader.close();
+ }
+
+ // read stitch simple + record stitch
+ // final static String STR_SCHEMA =
+ // "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ @Test
+ public void testRead9() throws IOException, ParseException {
+ String projection = new String("c.a, c2.r1");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(RowValue);
+ System.out.println("read 9: " + RowValue.toString());
+ // read 9:
+ // ({(3.1415926),(123.456789)},{((1,record1_string1)),((2,record2_string1))})
+ Iterator<Tuple> it = ((DataBag) RowValue.get(0)).iterator();
+ int list = 0;
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(3.1415926, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list == 2) {
+ Assert.assertEquals(123.456789, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ Iterator<Tuple> it2 = ((DataBag) RowValue.get(1)).iterator();
+ list = 0;
+ while (it2.hasNext()) {
+ Tuple cur = it2.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(1, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ ((Tuple) cur.get(0)).get(2);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list == 2) {
+ Assert.assertEquals(2, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string1", ((Tuple) cur.get(0)).get(1));
+ try {
+ ((Tuple) cur.get(0)).get(2);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+ scanner.advance();
+ scanner.getValue(RowValue);
+ Iterator<Tuple> it3 = ((DataBag) RowValue.get(0)).iterator();
+ while (it3.hasNext()) {
+ Tuple cur = it3.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(7654.321, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list == 2) {
+ Assert.assertEquals(0.123456789, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ Iterator<Tuple> it4 = ((DataBag) RowValue.get(1)).iterator();
+ list = 0;
+ while (it4.hasNext()) {
+ Tuple cur = it4.next();
+ System.out.println(cur.get(0));
+ list++;
+ if (list == 1) {
+ Assert.assertEquals(3, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record1_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ ((Tuple) cur.get(0)).get(2);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ if (list == 2) {
+ Assert.assertEquals(4, ((Tuple) cur.get(0)).get(0));
+ Assert.assertEquals("record2_string2", ((Tuple) cur.get(0)).get(1));
+ try {
+ ((Tuple) cur.get(0)).get(2);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ try {
+ cur.get(1);
+ Assert.fail("Should throw index out of bounds exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ }
+
+ reader.close();
+ }
+
+ // Negative should not support 2nd level collection split
+ @Test
+ public void testSplit1() throws IOException, ParseException {
+ String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ String STR_STORAGE = "[c.a]";
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), this.getClass().getSimpleName());
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+ try {
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ Assert.fail("should throw exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+
+ // Negative should not support none_existent column split
+ @Test
+ public void testSplit2() throws IOException, ParseException {
+ String STR_SCHEMA = "c:collection(a:double, b:float, c:bytes),c2:collection(r1:record(f1:int, f2:string), d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ String STR_STORAGE = "[d]";
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), this.getClass().getSimpleName());
+ fs = path.getFileSystem(conf);
+ // drop any previous tables
+ BasicTable.drop(path, conf);
+ try {
+ BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+ STR_STORAGE, false, conf);
+ Assert.fail("should throw exception");
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing ColumnGroup APIs called as if in MapReduce Jobs
+ */
+public class TestColumnGroup {
+ static Configuration conf;
+ static Random random;
+ static Path rootPath;
+ static FileSystem fs;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+ random = new Random(System.nanoTime());
+ rootPath = new Path(System.getProperty("test.build.data",
+ "build/test/data/work-dir"));
+ fs = rootPath.getFileSystem(conf);
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ }
+
+ BytesWritable makeRandomKey(int max) {
+ return makeKey(random.nextInt(max));
+ }
+
+ static BytesWritable makeKey(int i) {
+ return new BytesWritable(String.format("key%09d", i).getBytes());
+ }
+
+ String makeString(String prefix, int max) {
+ return String.format("%s%09d", prefix, random.nextInt(max));
+ }
+
+ int createCG(int parts, int rows, String strSchema, Path path,
+ boolean properClose, boolean sorted, int[] emptyTFiles)
+ throws IOException, ParseException {
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
+
+ Set<Integer> emptyTFileSet = new HashSet<Integer>();
+ if (emptyTFiles != null) {
+ for (int i = 0; i < emptyTFiles.length; ++i) {
+ emptyTFileSet.add(emptyTFiles[i]);
+ }
+ }
+
+ ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, sorted,
+ "pig", "lzo2", false, conf);
+ writer.finish();
+
+ int total = 0;
+ Schema schema = new Schema(strSchema);
+ String colNames[] = schema.getColumns();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ int[] permutation = new int[parts];
+ for (int i = 0; i < parts; ++i) {
+ permutation[i] = i;
+ }
+
+ for (int i = parts - 1; i > 0; --i) {
+ int targetIndex = random.nextInt(i + 1);
+ int tmp = permutation[i];
+ permutation[i] = permutation[targetIndex];
+ permutation[targetIndex] = tmp;
+ }
+
+ for (int i = 0; i < parts; ++i) {
+ writer = new ColumnGroup.Writer(path, conf);
+ TableInserter inserter = writer.getInserter(String.format("part-%06d",
+ permutation[i]), true);
+ if ((rows > 0) && !emptyTFileSet.contains(permutation[i])) {
+ int actualRows = random.nextInt(rows) + rows / 2;
+ for (int j = 0; j < actualRows; ++j, ++total) {
+ BytesWritable key;
+ if (!sorted) {
+ key = makeRandomKey(rows * 10);
+ } else {
+ key = makeKey(total);
+ }
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserter.insert(key, tuple);
+ }
+ }
+ inserter.close();
+ }
+
+ if (properClose) {
+ writer = new ColumnGroup.Writer(path, conf);
+ writer.close();
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+ }
+
+ return total;
+ }
+
+ static class DupKeyGen {
+ int low, high;
+ int current;
+ boolean grow = true;
+ int index = 0;
+ int count = 0;
+
+ DupKeyGen(int low, int high) {
+ this.low = Math.max(10, low);
+ this.high = Math.max(this.low * 2, high);
+ current = this.low;
+ }
+
+ BytesWritable next() {
+ if (count == 0) {
+ count = nextCount();
+ ++index;
+ }
+ --count;
+ return makeKey(index);
+ }
+
+ int nextCount() {
+ int ret = current;
+ if ((grow && current > high) || (!grow && current < low)) {
+ grow = !grow;
+ }
+ if (grow) {
+ current *= 2;
+ } else {
+ current /= 2;
+ }
+ return ret;
+ }
+ }
+
+ int createCGDupKeys(int parts, int rows, String strSchema, Path path)
+ throws IOException, ParseException {
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
+
+ ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, true,
+ "pig", "lzo2", false, conf);
+ writer.finish();
+
+ int total = 0;
+ DupKeyGen keyGen = new DupKeyGen(10, rows * 3);
+ Schema schema = new Schema(strSchema);
+ String colNames[] = schema.getColumns();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ int[] permutation = new int[parts];
+ for (int i = 0; i < parts; ++i) {
+ permutation[i] = i;
+ }
+
+ for (int i = parts - 1; i > 0; --i) {
+ int targetIndex = random.nextInt(i + 1);
+ int tmp = permutation[i];
+ permutation[i] = permutation[targetIndex];
+ permutation[targetIndex] = tmp;
+ }
+
+ for (int i = 0; i < parts; ++i) {
+ writer = new ColumnGroup.Writer(path, conf);
+ TableInserter inserter = writer.getInserter(String.format("part-%06d",
+ permutation[i]), true);
+ if (rows > 0) {
+ int actualRows = random.nextInt(rows * 2 / 3) + rows * 2 / 3;
+ for (int j = 0; j < actualRows; ++j, ++total) {
+ BytesWritable key = keyGen.next();
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserter.insert(key, tuple);
+ }
+ }
+ inserter.close();
+ }
+
+ writer = new ColumnGroup.Writer(path, conf);
+ writer.close();
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+
+ return total;
+ }
+
+ void rangeSplitCG(int numSplits, int totalRows, String strProjection,
+ Path path) throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ long totalBytes = reader.getStatus().getSize();
+
+ List<CGRangeSplit> splits = reader.rangeSplit(numSplits);
+ reader.close();
+ int total = 0;
+ for (int i = 0; i < splits.size(); ++i) {
+ reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ total += doReadOnly(reader.getScanner(splits.get(i), true));
+ totalBytes -= reader.getBlockDistribution(splits.get(i)).getLength();
+ }
+ Assert.assertEquals(total, totalRows);
+ Assert.assertEquals(totalBytes, 0L);
+ }
+
+ void doRangeSplit(int[] numSplits, int totalRows, String projection, Path path)
+ throws IOException, ParseException {
+ for (int i : numSplits) {
+ if (i > 0) {
+ rangeSplitCG(i, totalRows, projection, path);
+ }
+ }
+ }
+
+ void keySplitCG(int numSplits, int totalRows, String strProjection, Path path)
+ throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ long totalBytes = reader.getStatus().getSize();
+ KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
+ Assert.assertEquals(totalBytes, keyDistri.length());
+ reader.close();
+ BytesWritable[] keys = null;
+ if (keyDistri.size() >= numSplits) {
+ keyDistri.resize(numSplits);
+ Assert.assertEquals(totalBytes, keyDistri.length());
+ RawComparable[] rawComparables = keyDistri.getKeys();
+ keys = new BytesWritable[rawComparables.length];
+ for (int i = 0; i < keys.length; ++i) {
+ keys[i] = new BytesWritable();
+ keys[i].setSize(rawComparables[i].size());
+ System.arraycopy(rawComparables[i].buffer(),
+ rawComparables[i].offset(), keys[i].get(), 0, rawComparables[i]
+ .size());
+ }
+ } else {
+ int targetSize = Math.min(totalRows / 10, numSplits);
+ // revert to manually cooked up keys.
+ Set<Integer> keySets = new TreeSet<Integer>();
+ while (keySets.size() < targetSize) {
+ keySets.add(random.nextInt(totalRows));
+ }
+ keys = new BytesWritable[targetSize];
+ if (!keySets.isEmpty()) {
+ int j = 0;
+ for (int i : keySets.toArray(new Integer[keySets.size()])) {
+ keys[j] = makeKey(i);
+ ++j;
+ }
+ }
+ }
+
+ int total = 0;
+ for (int i = 0; i < keys.length; ++i) {
+ reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ BytesWritable begin = (i == 0) ? null : keys[i - 1];
+ BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+ total += doReadOnly(reader.getScanner(begin, end, true));
+ }
+ Assert.assertEquals(total, totalRows);
+ }
+
+ void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+ throws IOException, ParseException {
+ for (int i : numSplits) {
+ if (i > 0) {
+ keySplitCG(i, totalRows, projection, path);
+ }
+ }
+ }
+
+ BasicTableStatus getStatus(Path path) throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ try {
+ return reader.getStatus();
+ } finally {
+ reader.close();
+ }
+ }
+
+ void doReadWrite(Path path, int parts, int rows, String schema,
+ String projection, boolean properClose, boolean sorted, int[] emptyTFiles)
+ throws IOException, ParseException {
+ int totalRows = createCG(parts, rows, schema, path, properClose, sorted,
+ emptyTFiles);
+ if (rows == 0) {
+ Assert.assertEquals(rows, 0);
+ }
+
+ doRangeSplit(new int[] { 1, 2, parts / 2, parts, 2 * parts }, totalRows,
+ projection, path);
+ if (sorted) {
+ doKeySplit(new int[] { 1, 2, parts / 2, parts, 2 * parts, 10 * parts },
+ totalRows, projection, path);
+ }
+ }
+
+ int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+ int total = 0;
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+ for (; !scanner.atEnd(); scanner.advance()) {
+ ++total;
+ switch (random.nextInt() % 4) {
+ case 0:
+ scanner.getKey(key);
+ break;
+ case 1:
+ scanner.getValue(value);
+ break;
+ case 2:
+ scanner.getKey(key);
+ scanner.getValue(value);
+ break;
+ default: // no-op.
+ }
+ }
+ scanner.close();
+
+ return total;
+ }
+
+ @Test
+ public void testNullSplits() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupNullSplits");
+ int totalRows = createCG(2, 10, "a, b, c", path, true, true, null);
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("a,d,c,f");
+ Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
+ Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, null,
+ false)));
+ reader.close();
+ }
+
+ @Test
+ public void testNegativeSplits() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestNegativeSplits");
+ int totalRows = createCG(2, 100, "a, b, c", path, true, true, null);
+ rangeSplitCG(-1, totalRows, "a,d,c,f", path);
+ }
+
+ @Test
+ public void testEmptyCG() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ @Test
+ public void testEmptyTFiles() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ public void testNormalCases() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupNormal");
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ @Test
+ public void testSomeEmptyTFiles() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
+ for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
+ emptyTFiles);
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
+ emptyTFiles);
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
+ emptyTFiles);
+ }
+ }
+
+ int countRows(Path path, String projection) throws IOException,
+ ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ if (projection != null) {
+ reader.setProjection(projection);
+ }
+ int totalRows = 0;
+ TableScanner scanner = reader.getScanner(null, true);
+ for (; !scanner.atEnd(); scanner.advance()) {
+ ++totalRows;
+ }
+ scanner.close();
+ return totalRows;
+ }
+
+ @Test
+ public void testProjection() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupProjection");
+ int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+ Assert.assertEquals(totalRows, countRows(path, null));
+ Assert.assertEquals(totalRows, countRows(path, ""));
+ }
+
+ @Test
+ public void testDuplicateKeys() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupDuplicateKeys");
+ int totalRows = createCGDupKeys(2, 250, "a, b, c", path);
+ doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+ path);
+ }
+
+ @Test
+ public void testSortedCGKeySplit() throws IOException, ParseException {
+ conf.setInt("table.output.tfile.minBlock.size", 640 * 1024);
+ Path path = new Path(rootPath, "TestSortedCGKeySplit");
+ int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+ doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+ path);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupInserters.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupInserters {
+ final static String outputFile = "TestColumnGroupInserters";
+ final static private Configuration conf = new Configuration();
+ private static FileSystem fs;
+ private static Path path;
+ private static ColumnGroup.Writer writer;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ // set default file system to local file system
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+ // must set a conf here to the underlying FS, or it barks
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ rawLFS.setConf(conf);
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), outputFile);
+ System.out.println("output file: " + path);
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ finish();
+ }
+
+ @Test
+ public void testInsertNullValues() throws IOException, ParseException {
+ fs.delete(path, true);
+ System.out.println("testInsertNullValues");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins = writer.getInserter("part1", true);
+ // Tuple row = TypesUtils.createTuple(writer.getSchema());
+ // ins.insert(new BytesWritable("key".getBytes()), row);
+ ins.close();
+ close();
+ }
+
+ @Test
+ public void testFailureInvalidSchema() throws IOException, ParseException {
+ fs.delete(path, true);
+ System.out.println("testFailureInvalidSchema");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins = writer.getInserter("part1", true);
+ Tuple row = TypesUtils.createTuple(Schema.parse("xyz, ijk, def"));
+ try {
+ ins.insert(new BytesWritable("key".getBytes()), row);
+ Assert.fail("Failed to catch diff schemas.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ ins.close();
+ close();
+ }
+ }
+
+ @Test
+ public void testFailureGetInserterAfterWriterClosed() throws IOException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testFailureGetInserterAfterWriterClosed");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ try {
+ writer.close();
+ TableInserter ins = writer.getInserter("part1", true);
+ Assert.fail("Failed to catch getInsertion after writer closure.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testFailureInsertAfterClose() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testFailureInsertAfterClose");
+ writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins = writer.getInserter("part1", true);
+
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+ row.set(1, map);
+
+ ins.insert(new BytesWritable("key".getBytes()), row);
+
+ ins.close();
+ writer.close();
+
+ try {
+ TableInserter ins2 = writer.getInserter("part2", true);
+ Assert.fail("Failed to catch insertion after closure.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testFailureInsertXtraColumn() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testFailureInsertXtraColumn");
+ writer = new ColumnGroup.Writer(path, "abc ", false, "pig", "lzo2", true,
+ conf);
+ TableInserter ins = writer.getInserter("part1", true);
+
+ try {
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+ row.set(1, map);
+ Assert
+ .fail("Failed to catch insertion an extra column not defined in schema.");
+ } catch (ExecException e) {
+ // noop, expecting exceptions
+ } finally {
+ ins.close();
+ close();
+ }
+ }
+
+ @Test
+ public void testInsertOneRow() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testInsertOneRow");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins = writer.getInserter("part1", true);
+
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+ row.set(1, map);
+
+ ins.insert(new BytesWritable("key".getBytes()), row);
+
+ ins.close();
+ close();
+ }
+
+ @Test
+ public void testInsert2Rows() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testInsert2Rows");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins = writer.getInserter("part1", true);
+
+ // row 1
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+ row.set(1, map);
+
+ ins.insert(new BytesWritable("key".getBytes()), row);
+
+ // row 2
+ TypesUtils.resetTuple(row);
+ row.set(0, new String("val2"));
+ map.put("joe", "boy");
+ map.put("jane", "girl");
+ // map should contain 3 k->v pairs
+ row.set(1, map);
+
+ ins.insert(new BytesWritable("key".getBytes()), row);
+ ins.close();
+
+ ins.close();
+ close();
+ }
+
+ @Test
+ public void testInsert2Inserters() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testInsert2Inserters");
+ writer = new ColumnGroup.Writer(path, "abc, def", false, "pig", "lzo2",
+ true, conf);
+ TableInserter ins1 = writer.getInserter("part1", true);
+ TableInserter ins2 = writer.getInserter("part2", true);
+
+ // row 1
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+
+ ins1.insert(new BytesWritable("key11".getBytes()), row);
+ ins2.insert(new BytesWritable("key21".getBytes()), row);
+
+ // row 2
+ TypesUtils.resetTuple(row);
+ row.set(0, new String("val2"));
+ map.put("joe", "boy");
+ map.put("jane", "girl");
+ // map should contain 3 k->v pairs
+ row.set(1, map);
+
+ ins2.insert(new BytesWritable("key22".getBytes()), row);
+ // ins2.close();
+ ins1.insert(new BytesWritable("key12".getBytes()), row);
+
+ ins1.close();
+ ins2.close();
+ close();
+ }
+
+ @Test
+ public void testFailureOverlappingKeys() throws IOException, ExecException,
+ ParseException {
+ fs.delete(path, true);
+ System.out.println("testFailureOverlappingKeys");
+ writer = new ColumnGroup.Writer(path, "abc, def ", true, "pig", "lzo2",
+ true, conf);
+ TableInserter ins1 = writer.getInserter("part1", false);
+ TableInserter ins2 = writer.getInserter("part2", false);
+
+ // row 1
+
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, new String("val1"));
+
+ SortedMap<String, String> map = new TreeMap<String, String>();
+ map.put("john", "boy");
+ row.set(1, map);
+
+ ins1.insert(new BytesWritable("key1".getBytes()), row);
+ ins2.insert(new BytesWritable("key2".getBytes()), row);
+
+ // row 2
+ TypesUtils.resetTuple(row);
+ row.set(0, new String("val2"));
+ map.put("joe", "boy");
+ map.put("jane", "girl");
+ // map should contain 3 k->v pairs
+ row.set(1, map);
+
+ ins2.insert(new BytesWritable("key3".getBytes()), row);
+ // ins2.close();
+ ins1.insert(new BytesWritable("key4".getBytes()), row);
+ try {
+ ins1.close();
+ ins2.close();
+ close();
+ Assert.fail("Failed to detect overlapping keys.");
+ } catch (IOException e) {
+ // noop, exceptions expected
+ } finally {
+ ColumnGroup.drop(path, conf);
+ }
+ }
+
+ private static void finish() throws IOException {
+ if (writer != null) {
+ writer.finish();
+ }
+ }
+
+ private static void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ ColumnGroup.drop(path, conf);
+ }
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupOpen.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupOpen {
+ final static String outputFile = "TestColumnGroupOpen";
+ final private static Configuration conf = new Configuration();
+ private static FileSystem fs;
+ private static Path path;
+ private static ColumnGroup.Writer writer;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ // set default file system to local file system
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+ // must set a conf here to the underlying FS, or it barks
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ rawLFS.setConf(conf);
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), outputFile);
+ System.out.println("output file: " + path);
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ finish();
+ }
+
+ @Test
+ public void testNew() throws IOException, ParseException {
+ System.out.println("testNew");
+ writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+ true, conf);
+ // NOTE: don't call writer.close() here
+ close();
+ }
+
+ @Test
+ public void testFailureExistingSortedDiff() throws IOException,
+ ParseException {
+ System.out.println("testFailureExistingSortedDiff");
+ try {
+ writer = new ColumnGroup.Writer(path, "abc, def ", false, "pig", "lzo2",
+ true, conf);
+ finish();
+ writer = new ColumnGroup.Writer(path, "abc, def", true, "pig", "lzo2",
+ false, conf);
+ Assert.fail("Failed to catch sorted flag alteration.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testExisting() throws IOException, ParseException {
+ System.out.println("testExisting");
+ writer = new ColumnGroup.Writer(path, " abc , def ", false, "pig", "lzo2",
+ false, conf);
+ writer.close();
+ close();
+ }
+
+ @Test
+ public void testFailurePathNotDir() throws IOException, ParseException {
+ System.out.println("testFailurePathNotDir");
+ try {
+ // fs.delete(path, true);
+ ColumnGroup.drop(path, conf);
+
+ FSDataOutputStream in = fs.create(path);
+ in.close();
+ writer = new ColumnGroup.Writer(path, " abc , def ", false, "pig",
+ "lzo2", false, conf);
+ Assert.fail("Failed to catch path not a directory.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testFailureMetaFileExists() throws IOException, ParseException {
+ System.out.println("testFailureMetaFileExists");
+ try {
+ fs.delete(path, true);
+ FSDataOutputStream in = fs.create(new Path(path, ColumnGroup.META_FILE));
+ in.close();
+ writer = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", false,
+ conf);
+ Assert.fail("Failed to catch meta file existence.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testFailureDiffSchema() throws IOException, ParseException {
+ System.out.println("testFailureDiffSchema");
+ try {
+ writer = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", false,
+ conf);
+ writer.finish();
+ writer = new ColumnGroup.Writer(path, "efg", false, "pig", "lzo2", false,
+ conf);
+ Assert.fail("Failed to catch schema differences.");
+ } catch (IOException e) {
+ // noop, expecting exceptions
+ } finally {
+ close();
+ }
+ }
+
+ @Test
+ public void testMultiWriters() throws IOException, ParseException {
+ System.out.println("testMultiWriters");
+ ColumnGroup.Writer writer1 = null;
+ ColumnGroup.Writer writer2 = null;
+ ColumnGroup.Writer writer3 = null;
+ try {
+ writer1 = new ColumnGroup.Writer(path, "abc", false, "pig", "lzo2", true,
+ conf);
+ writer2 = new ColumnGroup.Writer(path, conf);
+ writer3 = new ColumnGroup.Writer(path, conf);
+
+ TableInserter ins1 = writer1.getInserter("part1", false);
+ TableInserter ins2 = writer2.getInserter("part2", false);
+ TableInserter ins3 = writer3.getInserter("part3", false);
+ ins1.close();
+ ins2.close();
+ ins3.close();
+ // }
+ // catch (IOException e) {
+ // // noop, expecting exceptions
+ // throw e;
+ } finally {
+ if (writer1 != null) {
+ writer1.finish();
+ }
+ if (writer2 != null) {
+ writer2.finish();
+ }
+ if (writer3 != null) {
+ writer3.finish();
+ }
+ close();
+ }
+ }
+
+ private static void finish() throws IOException {
+ if (writer != null) {
+ writer.finish();
+ }
+ }
+
+ private static void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ ColumnGroup.drop(path, conf);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestColumnGroupProjections {
+ final static String outputFile = "TestColumnGroupProjections";
+ final static private Configuration conf = new Configuration();
+ private static FileSystem fs;
+ private static Path path;
+ private static Schema schema;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException, ParseException {
+ // set default file system to local file system
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+
+ // must set a conf here to the underlying FS, or it barks
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ rawLFS.setConf(conf);
+ fs = new LocalFileSystem(rawLFS);
+ path = new Path(fs.getWorkingDirectory(), outputFile);
+ System.out.println("output file: " + path);
+
+ schema = new Schema("a,b,c,d,e,f,g");
+
+ ColumnGroup.Writer writer = new ColumnGroup.Writer(path, schema, false,
+ "pig", "lzo2", true, conf);
+ TableInserter ins = writer.getInserter("part0", true);
+
+ // row 1
+ Tuple row = TypesUtils.createTuple(writer.getSchema());
+ row.set(0, "a1");
+ row.set(1, "b1");
+ row.set(2, "c1");
+ row.set(3, "d1");
+ row.set(4, "e1");
+ row.set(5, "f1");
+ row.set(6, "g1");
+ ins.insert(new BytesWritable("k1".getBytes()), row);
+
+ // row 2
+ TypesUtils.resetTuple(row);
+ row.set(0, "a2");
+ row.set(1, "b2");
+ row.set(2, "c2");
+ row.set(3, "d2");
+ row.set(4, "e2");
+ row.set(5, "f2");
+ row.set(6, "g2");
+ ins.insert(new BytesWritable("k2".getBytes()), row);
+ ins.close();
+
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ }
+
+ @Test
+ // default projection (without any projection) should return every column
+ public void testDefaultProjection() throws IOException, ExecException,
+ ParseException {
+ System.out.println("testDefaultProjection");
+ // test without beginKey/endKey
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ TableScanner scanner = reader.getScanner(null, false);
+
+ defTest(reader, scanner);
+
+ scanner.close();
+ reader.close();
+ }
+
+ private void defTest(ColumnGroup.Reader reader, TableScanner scanner)
+ throws IOException, ParseException {
+ BytesWritable key = new BytesWritable();
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ Assert.assertEquals("a1", row.get(0));
+ Assert.assertEquals("b1", row.get(1));
+ Assert.assertEquals("g1", row.get(6));
+
+ // move to next row
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ Assert.assertEquals("a2", row.get(0));
+ Assert.assertEquals("b2", row.get(1));
+ Assert.assertEquals("g2", row.get(6));
+ }
+
+ @Test
+ // null projection should be same as default (fully projected) projection
+ public void testNullProjection() throws IOException, ExecException,
+ ParseException {
+ System.out.println("testNullProjection");
+ // test without beginKey/endKey
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(null);
+ TableScanner scanner = reader.getScanner(null, false);
+
+ defTest(reader, scanner);
+
+ scanner.close();
+ reader.close();
+ }
+
+ @Test
+ // empty projection should project no columns at all
+ public void testEmptyProjection() throws Exception {
+ System.out.println("testEmptyProjection");
+ // test without beginKey/endKey
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("");
+
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+ TableScanner scanner = reader.getScanner(null, false);
+ BytesWritable key = new BytesWritable();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ try {
+ row.get(0);
+ Assert.fail("Failed to catch out of boundary exceptions.");
+ } catch (ExecException e) {
+ // no op, expecting out of bounds exceptions
+ }
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ // Assert.assertEquals("c2", row.get(0));
+
+ scanner.close();
+ reader.close();
+ }
+
+ @Test
+ // a column name that does not exist in the column group
+ public void testOneNonExistentProjection() throws Exception {
+ System.out.println("testOneNonExistentProjection");
+ // test without beginKey/endKey
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("X");
+
+ Tuple row = TypesUtils.createTuple(1);
+ TableScanner scanner = reader.getScanner(null, false);
+ BytesWritable key = new BytesWritable();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ Assert.assertNull(row.get(0));
+ try {
+ row.get(1);
+ Assert.fail("Failed to catch out of boundary exceptions.");
+ } catch (ExecException e) {
+ // no op, expecting out of bounds exceptions
+ }
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ // Assert.assertEquals("c2", row.get(0));
+
+ scanner.close();
+ reader.close();
+ }
+
+ @Test
+ // normal one column projection
+ public void testOneColumnProjection() throws Exception {
+ System.out.println("testOneColumnProjection");
+ // test without beginKey/endKey
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("c");
+
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+ TableScanner scanner = reader.getScanner(null, false);
+ BytesWritable key = new BytesWritable();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ Assert.assertEquals("c1", row.get(0));
+ try {
+ row.get(1);
+ Assert.fail("Failed to catch 'out of boundary' exceptions.");
+ } catch (ExecException e) {
+ // no op, expecting out of bounds exceptions
+ }
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ Assert.assertEquals("c2", row.get(0));
+
+ scanner.close();
+ reader.close();
+ }
+
+ @Test
+ // normal one column plus a non-existent column projection
+ public void testOnePlusNonProjection() throws Exception {
+ System.out.println("testOnePlusNonProjection");
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(",f");
+
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+ TableScanner scanner = reader.getScanner(null, false);
+ BytesWritable key = new BytesWritable();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ Assert.assertNull(row.get(0));
+ Assert.assertEquals("f1", row.get(1));
+ try {
+ row.get(2);
+ Assert.fail("Failed to catch 'out of boundary' exceptions.");
+ } catch (ExecException e) {
+ // no op, expecting out of bounds exceptions
+ }
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ Assert.assertEquals("f2", row.get(1));
+
+ scanner.close();
+ reader.close();
+ }
+
+ @Test
+ // two normal columns projected
+ public void testTwoColumnsProjection() throws Exception {
+ System.out.println("testTwoColumnsProjection");
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("f,a");
+
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+ TableScanner scanner = reader.getScanner(null, false);
+ BytesWritable key = new BytesWritable();
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k1".getBytes()));
+ scanner.getValue(row);
+ Assert.assertEquals("f1", row.get(0));
+ Assert.assertEquals("a1", row.get(1));
+ try {
+ row.get(2);
+ Assert.fail("Failed to catch 'out of boundary' exceptions.");
+ } catch (ExecException e) {
+ // no op, expecting out of bounds exceptions
+ }
+
+ scanner.advance();
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k2".getBytes()));
+ TypesUtils.resetTuple(row);
+ scanner.getValue(row);
+ Assert.assertEquals("f2", row.get(0));
+ Assert.assertEquals("a2", row.get(1));
+
+ scanner.close();
+ reader.close();
+ }
+}