You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC
svn commit: r803312 [13/16] - in /hadoop/pig/trunk: ./ contrib/zebra/
contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/
contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/
contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/...
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,996 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.mapred.ArticleGenerator.Summary;
+import org.apache.hadoop.zebra.mapred.TestBasicTableIOFormatLocalFS.FreqWordCache.Item;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Testing BasicTableOutputFormat and TableInputFormat using Local FS
+ */
+public class TestBasicTableIOFormatLocalFS extends TestCase {
+ static class Options {
+ int taskTrackers = 4;
+ int dataNodes = 4;
+ int srcFiles = 7;
+ int numBatches = 1;
+ int srcFileLen = 2 * 1024; // 20KB
+ int numMapper = 3;
+ int numReducer = 2;
+ int numFreqWords = 10;
+ long minTableSplitSize = 16 * 1024L;
+ boolean localFS = true;
+ String compression = "none";
+ String rootPath = "TestBasicTableIOFormat";
+ String srcPath = "docs";
+ String fwdIndexRootPath = "fwdIndex";
+ String invIndexTablePath = "invIndex";
+ String freqWordTablePath = "freqWord";
+ }
+
+ static Log LOG = LogFactory.getLog(TestBasicTableIOFormatLocalFS.class
+ .getName());
+
+ Options options;
+ Configuration conf;
+ MiniDFSCluster dfs;
+ FileSystem fileSys;
+ Path rootPath;
+ Path srcPath;
+ Path fwdIndexRootPath;
+ Path invIndexTablePath;
+ Path freqWordTablePath;
+ MiniMRCluster mr;
+ ArticleGenerator articalGen;
+ Map<String, Summary> summary;
+
+ @Override
+ protected void setUp() throws IOException {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ Path path = new Path(".");
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (options == null) {
+ options = new Options();
+ }
+
+ if (conf == null) {
+ conf = new Configuration();
+ }
+
+ articalGen = new ArticleGenerator(1000, 1, 20, 100);
+ summary = new HashMap<String, Summary>();
+
+ if (options.localFS) {
+ Path localFSRootPath = new Path(System.getProperty("test.build.data",
+ "build/test/data/work-dir"));
+ fileSys = localFSRootPath.getFileSystem(conf);
+ rootPath = new Path(localFSRootPath, options.rootPath);
+ mr = new MiniMRCluster(options.taskTrackers, "file:///", 3);
+ } else {
+ dfs = new MiniDFSCluster(conf, options.dataNodes, true, null);
+ fileSys = dfs.getFileSystem();
+ rootPath = new Path(options.rootPath);
+ mr = new MiniMRCluster(options.taskTrackers, fileSys.getUri().toString(),
+ 1);
+ }
+ conf = getJobConf("TestBasicTableIOFormat");
+ srcPath = new Path(rootPath, options.srcPath);
+ fwdIndexRootPath = new Path(rootPath, options.fwdIndexRootPath);
+ invIndexTablePath = new Path(rootPath, options.invIndexTablePath);
+ freqWordTablePath = new Path(rootPath, options.freqWordTablePath);
+ }
+
+ @Override
+ protected void tearDown() throws IOException {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+
+ JobConf getJobConf(String name) {
+ JobConf jobConf = mr.createJobConf();
+ jobConf.setJobName(name);
+ jobConf.set("table.output.tfile.compression", options.compression);
+ jobConf.setInt("mapred.app.freqWords.count", options.numFreqWords);
+ return jobConf;
+ }
+
+ /**
+ * Create a bunch of text files under a sub-directory of the srcPath. The name
+ * of the sub-directory is named after the batch name.
+ *
+ * @param batchName
+ * The batch name.
+ * @throws IOException
+ */
+ void createSourceFiles(String batchName) throws IOException {
+ LOG.info("Creating source data folder: " + batchName);
+ Path batchDir = new Path(srcPath, batchName);
+ LOG.info("Cleaning directory: " + batchName);
+ fileSys.delete(batchDir, true);
+ LOG.info("Generating input files: " + batchName);
+ articalGen.batchArticalCreation(fileSys, new Path(srcPath, batchName),
+ "doc-", options.srcFiles, options.srcFileLen);
+ Summary s = articalGen.getSummary();
+ // dumpSummary(s);
+ long tmp = 0;
+ for (Iterator<Long> it = s.wordCntDist.values().iterator(); it.hasNext(); tmp += it
+ .next())
+ ;
+ Assert.assertEquals(tmp, s.wordCount);
+ summary.put(batchName, s);
+ articalGen.resetSummary();
+ }
+
+ /**
+ * Generate forward index. Map-only task.
+ *
+ * <pre>
+ * Map Input:
+ * K = LongWritable (byte offset)
+ * V = Text (text line)
+ * Map Output:
+ * K = word: String.
+ * V = Tuple of {fileName:String, wordPos:Integer, lineNo:Integer }
+ * </pre>
+ */
+ static class ForwardIndexGen {
+ static class MapClass implements
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable outKey;
+ private Tuple outRow;
+ // index into the output tuple for fileName, wordPos, lineNo.
+ private int idxFileName, idxWordPos, idxLineNo;
+ private String filePath;
+ // maintain line number and word position.
+ private int lineNo = 0;
+ private int wordPos = 0;
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+ if (filePath == null) {
+ FileSplit split = (FileSplit) reporter.getInputSplit();
+ filePath = split.getPath().toString();
+ }
+ String line = value.toString();
+ StringTokenizer st = new StringTokenizer(line, " ");
+ while (st.hasMoreElements()) {
+ byte[] word = st.nextToken().getBytes();
+ outKey.set(word, 0, word.length);
+ TypesUtils.resetTuple(outRow);
+ try {
+ outRow.set(idxFileName, filePath);
+ outRow.set(idxWordPos, new Integer(wordPos));
+ outRow.set(idxLineNo, new Integer(lineNo));
+ output.collect(outKey, outRow);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ ++wordPos;
+ }
+ ++lineNo;
+
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("ForwardIndexGen.MapClass.configure");
+ outKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ outRow = TypesUtils.createTuple(outSchema);
+ idxFileName = outSchema.getColumnIndex("fileName");
+ idxWordPos = outSchema.getColumnIndex("wordPos");
+ idxLineNo = outSchema.getColumnIndex("lineNo");
+ } catch (IOException e) {
+ throw new RuntimeException("Schema parsing failed : "
+ + e.getMessage());
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed : "
+ + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+ }
+
+ /**
+ * Run forward index generation.
+ *
+ * @param batchName
+ * @throws IOException
+ */
+ void runForwardIndexGen(String batchName) throws IOException, ParseException {
+ LOG.info("Run Map-only job to convert source data to forward index: "
+ + batchName);
+
+ JobConf jobConf = getJobConf("fwdIndexGen-" + batchName);
+
+ // input-related settings
+ jobConf.setInputFormat(TextInputFormat.class);
+ jobConf.setMapperClass(ForwardIndexGen.MapClass.class);
+ FileInputFormat.setInputPaths(jobConf, new Path(srcPath, batchName));
+ jobConf.setNumMapTasks(options.numMapper);
+
+ // output related settings
+ Path outPath = new Path(fwdIndexRootPath, batchName);
+ fileSys.delete(outPath, true);
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, outPath);
+ BasicTableOutputFormat.setSchema(jobConf, "fileName, wordPos, lineNo");
+
+ // set map-only job.
+ jobConf.setNumReduceTasks(0);
+ JobClient.runJob(jobConf);
+ }
+
+ /**
+ * Count the # of rows of a BasicTable
+ *
+ * @param tablePath
+ * The path to the BasicTable
+ * @return Number of rows.
+ * @throws IOException
+ */
+ long countRows(Path tablePath) throws IOException, ParseException {
+ BasicTable.Reader reader = new BasicTable.Reader(tablePath, conf);
+ reader.setProjection("");
+ long totalRows = 0;
+ TableScanner scanner = reader.getScanner(null, true);
+ for (; !scanner.atEnd(); scanner.advance()) {
+ ++totalRows;
+ }
+ scanner.close();
+ return totalRows;
+ }
+
+ /**
+ * Given a batch ID, return the batch name.
+ *
+ * @param i
+ * batch ID
+ * @return Batch name.
+ */
+ static String batchName(int i) {
+ return String.format("batch-%03d", i);
+ }
+
+ /**
+ * Inverted index for one word.
+ */
+ static class InvIndex implements Writable {
+ int count = 0;
+ // a map from filePath to all occurrences of the word positions.
+ Map<String, ArrayList<Integer>> index;
+
+ InvIndex() {
+ index = new TreeMap<String, ArrayList<Integer>>();
+ }
+
+ InvIndex(String fileName, int pos) {
+ this();
+ add(fileName, pos);
+ }
+
+ void add(String fileName, int pos) {
+ ++count;
+ ArrayList<Integer> list = index.get(fileName);
+ if (list == null) {
+ list = new ArrayList<Integer>(1);
+ index.put(fileName, list);
+ }
+ list.add(pos);
+ }
+
+ void add(String fileName, ArrayList<Integer> positions) {
+ ArrayList<Integer> list = index.get(fileName);
+ if (list == null) {
+ list = new ArrayList<Integer>();
+ index.put(fileName, list);
+ }
+ count += positions.size();
+ list.ensureCapacity(list.size() + positions.size());
+ list.addAll(positions);
+ }
+
+ void reduce(InvIndex other) {
+ for (Iterator<Map.Entry<String, ArrayList<Integer>>> it = other.index
+ .entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, ArrayList<Integer>> e = it.next();
+ add(e.getKey(), e.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ count = 0;
+ index.clear();
+ count = Utils.readVInt(in);
+ if (count > 0) {
+ int n = Utils.readVInt(in);
+ for (int i = 0; i < n; ++i) {
+ String fileName = Utils.readString(in);
+ int m = Utils.readVInt(in);
+ ArrayList<Integer> list = new ArrayList<Integer>(m);
+ index.put(fileName, list);
+ for (int j = 0; j < m; ++j) {
+ list.add(Utils.readVInt(in));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, count);
+ if (count > 0) {
+ Utils.writeVInt(out, index.size());
+ for (Iterator<Map.Entry<String, ArrayList<Integer>>> it = index
+ .entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, ArrayList<Integer>> e = it.next();
+ Utils.writeString(out, e.getKey());
+ ArrayList<Integer> list = e.getValue();
+ Utils.writeVInt(out, list.size());
+ for (Iterator<Integer> it2 = list.iterator(); it2.hasNext();) {
+ Utils.writeVInt(out, it2.next());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate inverted Index.
+ *
+ * <pre>
+ * Mapper Input =
+ * K: BytesWritable word;
+ * V: Tuple { fileName:String, wordPos:Integer };
+ *
+ * Mapper Output =
+ * K: BytesWritable word;
+ * V: InvIndex;
+ *
+ * Reducer Output =
+ * K: BytesWritable word;
+ * V: Tuple {count:Integer, index: Map of {fileName:String, Bag of {wordPos:Integer}}};
+ * </pre>
+ */
+ static class InvertedIndexGen {
+ static class MapClass implements
+ Mapper<BytesWritable, Tuple, BytesWritable, InvIndex> {
+ // index of fileName and wordPos fileds of the input tuple
+ int idxFileName, idxWordPos;
+
+ @Override
+ public void map(BytesWritable key, Tuple value,
+ OutputCollector<BytesWritable, InvIndex> output, Reporter reporter)
+ throws IOException {
+ try {
+ String fileName = (String) value.get(idxFileName);
+ int wordPos = (Integer) value.get(idxWordPos);
+ output.collect(key, new InvIndex(fileName, wordPos));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("InvertedIndexGen.MapClass.configure");
+ String projection;
+ try {
+ projection = TableInputFormat.getProjection(job);
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed : "
+ + e.getMessage());
+ } catch (IOException e) {
+ throw new RuntimeException("TableInputFormat.getProjection", e);
+ }
+ idxFileName = Projection.getColumnIndex(projection, "fileName");
+ idxWordPos = Projection.getColumnIndex(projection, "wordPos");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+
+ static class CombinerClass implements
+ Reducer<BytesWritable, InvIndex, BytesWritable, InvIndex> {
+
+ @Override
+ public void reduce(BytesWritable key, Iterator<InvIndex> values,
+ OutputCollector<BytesWritable, InvIndex> output, Reporter reporter)
+ throws IOException {
+ InvIndex sum = new InvIndex();
+ for (; values.hasNext();) {
+ sum.reduce(values.next());
+ }
+ output.collect(key, sum);
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("InvertedIndexGen.CombinerClass.configure");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+
+ static class ReduceClass implements
+ Reducer<BytesWritable, InvIndex, BytesWritable, Tuple> {
+ Tuple outRow;
+ int idxCount, idxIndex;
+ Schema wordPosSchema;
+ int idxWordPos;
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("InvertedIndexGen.ReduceClass.configure");
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ outRow = TypesUtils.createTuple(outSchema);
+ idxCount = outSchema.getColumnIndex("count");
+ idxIndex = outSchema.getColumnIndex("index");
+ wordPosSchema = new Schema("wordPos");
+ idxWordPos = wordPosSchema.getColumnIndex("wordPos");
+ } catch (IOException e) {
+ throw new RuntimeException("Schema parsing failed :" + e.getMessage());
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed :" + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ Map<String, DataBag> convertInvIndex(Map<String, ArrayList<Integer>> index)
+ throws IOException {
+ Map<String, DataBag> ret = new TreeMap<String, DataBag>();
+ for (Iterator<Map.Entry<String, ArrayList<Integer>>> it = index
+ .entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, ArrayList<Integer>> e = it.next();
+ DataBag bag = TypesUtils.createBag();
+ for (Iterator<Integer> it2 = e.getValue().iterator(); it2.hasNext();) {
+ Tuple tuple = TypesUtils.createTuple(wordPosSchema);
+ tuple.set(idxWordPos, it2.next());
+ bag.add(tuple);
+ }
+ ret.put(e.getKey(), bag);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void reduce(BytesWritable key, Iterator<InvIndex> values,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+ InvIndex sum = new InvIndex();
+ for (; values.hasNext();) {
+ sum.reduce(values.next());
+ }
+ try {
+ outRow.set(idxCount, sum.count);
+ outRow.set(idxIndex, convertInvIndex(sum.index));
+ output.collect(key, outRow);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ void runInvertedIndexGen() throws IOException, ParseException {
+ LOG.info("Converting forward index to inverted index");
+ JobConf jobConf = getJobConf("runInvertedIndexGen");
+
+ // input-related settings
+ jobConf.setInputFormat(TableInputFormat.class);
+ jobConf.setMapperClass(InvertedIndexGen.MapClass.class);
+ Path[] paths = new Path[options.numBatches];
+ for (int i = 0; i < options.numBatches; ++i) {
+ paths[i] = new Path(fwdIndexRootPath, batchName(i));
+ }
+ TableInputFormat.setInputPaths(jobConf, paths);
+ // TableInputFormat.setProjection(jobConf, "fileName, wordPos");
+ TableInputFormat.setMinSplitSize(jobConf, options.minTableSplitSize);
+ jobConf.setNumMapTasks(options.numMapper);
+ jobConf.setMapOutputKeyClass(BytesWritable.class);
+ jobConf.setMapOutputValueClass(InvIndex.class);
+
+ // output related settings
+ fileSys.delete(invIndexTablePath, true);
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ jobConf.setReducerClass(InvertedIndexGen.ReduceClass.class);
+ jobConf.setCombinerClass(InvertedIndexGen.CombinerClass.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, invIndexTablePath);
+ BasicTableOutputFormat.setSchema(jobConf, "count, index");
+ jobConf.setNumReduceTasks(options.numReducer);
+
+ JobClient.runJob(jobConf);
+ }
+
+ void reduce(Summary sum, Summary delta) {
+ sum.lineCount += delta.lineCount;
+ sum.wordCount += delta.wordCount;
+ reduce(sum.wordCntDist, delta.wordCntDist);
+ }
+
+ void reduce(Map<String, Long> sum, Map<String, Long> delta) {
+ for (Iterator<Map.Entry<String, Long>> it = delta.entrySet().iterator(); it
+ .hasNext();) {
+ Map.Entry<String, Long> e = it.next();
+ String key = e.getKey();
+ Long base = sum.get(key);
+ sum.put(key, (base == null) ? e.getValue() : base + e.getValue());
+ }
+ }
+
+ void dumpSummary(Summary s) {
+ LOG.info("Dumping Summary");
+ LOG.info("Word Count: " + s.wordCount);
+ for (Iterator<Map.Entry<String, Long>> it = s.wordCntDist.entrySet()
+ .iterator(); it.hasNext();) {
+ Map.Entry<String, Long> e = it.next();
+ LOG.info(e.getKey() + "->" + e.getValue());
+ }
+ }
+
+ /**
+ * Verify the word counts from the invIndexTable is the same as collected from
+ * the ArticleGenerator.
+ *
+ * @throws IOException
+ */
+ void verifyWordCount() throws IOException, ParseException {
+ Summary expected = new Summary();
+ for (Iterator<Summary> it = summary.values().iterator(); it.hasNext();) {
+ Summary e = it.next();
+ // dumpSummary(e);
+ reduce(expected, e);
+ }
+ // LOG.info("Dumping aggregated Summary");
+ // dumpSummary(expected);
+
+ Summary actual = new Summary();
+ BasicTable.Reader reader = new BasicTable.Reader(invIndexTablePath, conf);
+ reader.setProjection("count");
+ TableScanner scanner = reader.getScanner(null, true);
+ Tuple tuple = TypesUtils.createTuple(Projection.toSchema(scanner
+ .getProjection()));
+ BytesWritable key = new BytesWritable();
+ for (; !scanner.atEnd(); scanner.advance()) {
+ scanner.getKey(key);
+ scanner.getValue(tuple);
+ int count = 0;
+ try {
+ count = (Integer) tuple.get(0);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ actual.wordCount += count;
+ String word = new String(key.get(), 0, key.getSize());
+ actual.wordCntDist.put(word, (long) count);
+ }
+ scanner.close();
+ // LOG.info("Dumping MR calculated Summary");
+ // dumpSummary(actual);
+ Assert.assertEquals(expected.wordCount, actual.wordCount);
+ Assert.assertEquals(expected.wordCntDist.size(), actual.wordCntDist.size());
+ for (Iterator<Map.Entry<String, Long>> it = expected.wordCntDist.entrySet()
+ .iterator(); it.hasNext();) {
+ Map.Entry<String, Long> e = it.next();
+ String word = e.getKey();
+ Long myCount = actual.wordCntDist.get(word);
+ Assert.assertFalse(word, myCount == null);
+ Assert.assertEquals(word, e.getValue(), myCount);
+ }
+ }
+
+ /**
+ * Caching the top K frequent words.
+ */
+ static class FreqWordCache {
+ static class Item {
+ BytesWritable word;
+ int count;
+
+ Item(BytesWritable w, int c) {
+ word = new BytesWritable();
+ word.set(w.get(), 0, w.getSize());
+ count = c;
+ }
+ }
+
+ int k;
+ PriorityQueue<Item> words;
+
+ FreqWordCache(int k) {
+ if (k <= 0) {
+ throw new IllegalArgumentException("Expecting positive int");
+ }
+ this.k = k;
+ words = new PriorityQueue<Item>(k, new Comparator<Item>() {
+ @Override
+ public int compare(Item o1, Item o2) {
+ if (o1.count != o2.count) {
+ return o1.count - o2.count;
+ }
+ return -o1.word.compareTo(o2.word);
+ }
+ });
+ }
+
+ void add(BytesWritable word, int cnt) {
+ while ((words.size() >= k) && words.peek().count < cnt) {
+ words.poll();
+ }
+ if ((words.size() < k) || words.peek().count == cnt) {
+ words.add(new Item(word, cnt));
+ }
+ }
+
+ void add(Iterator<BytesWritable> itWords, int cnt) {
+ while ((words.size() >= k) && words.peek().count < cnt) {
+ words.poll();
+ }
+ if ((words.size() < k) || words.peek().count == cnt) {
+ for (; itWords.hasNext();) {
+ words.add(new Item(itWords.next(), cnt));
+ }
+ }
+ }
+
+ Item[] toArray() {
+ Item[] ret = new Item[words.size()];
+ for (int i = 0; i < ret.length; ++i) {
+ ret[i] = words.poll();
+ }
+
+ for (int i = 0; i < ret.length / 2; ++i) {
+ Item tmp = ret[i];
+ ret[i] = ret[ret.length - i - 1];
+ ret[ret.length - i - 1] = tmp;
+ }
+
+ return ret;
+ }
+ }
+
+ /**
+ * Get the most frequent words from inverted index. The mapper uses a priority
+ * queue to keep the top frequent words in memory and output the results in
+ * close().
+ *
+ * <pre>
+ * Mapper Input =
+ * K: BytesWritable word;
+ * V: Tuple { count:Integer }.
+ *
+ * Mapper Output =
+ * K: IntWritabl: count
+ * V: BytesWritable: word
+ *
+ * Reducer Output =
+ * K: BytesWritable word;
+ * V: Tuple { count:Integer }.
+ * </pre>
+ */
+ static class FreqWords {
+ static int getFreqWordsCount(Configuration conf) {
+ return conf.getInt("mapred.app.freqWords.count", 100);
+ }
+
+ static class MapClass implements
+ Mapper<BytesWritable, Tuple, IntWritable, BytesWritable> {
+ int idxCount;
+ FreqWordCache freqWords;
+ IntWritable intWritable;
+ OutputCollector<IntWritable, BytesWritable> out;
+
+ @Override
+ public void map(BytesWritable key, Tuple value,
+ OutputCollector<IntWritable, BytesWritable> output, Reporter reporter)
+ throws IOException {
+ if (out == null)
+ out = output;
+ try {
+ int count = (Integer) value.get(idxCount);
+ freqWords.add(key, count);
+ reporter.progress();
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("FreqWords.MapClass.configure");
+ String inSchema;
+ try {
+ inSchema = TableInputFormat.getProjection(job);
+ } catch (ParseException e) {
+ throw new RuntimeException("Projection parsing failed : "
+ + e.getMessage());
+ } catch (IOException e) {
+ throw new RuntimeException("TableInputFormat.getprojection", e);
+ }
+ idxCount = Projection.getColumnIndex(inSchema, "count");
+ intWritable = new IntWritable();
+ freqWords = new FreqWordCache(getFreqWordsCount(job));
+ }
+
+ @Override
+ public void close() throws IOException {
+ Item[] items = freqWords.toArray();
+ for (Item i : items) {
+ intWritable.set(i.count);
+ out.collect(intWritable, i.word);
+ }
+ }
+ }
+
+ static class CombinerClass implements
+ Reducer<IntWritable, BytesWritable, IntWritable, BytesWritable> {
+ FreqWordCache freqWords;
+ OutputCollector<IntWritable, BytesWritable> out;
+ IntWritable intWritable;
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("FreqWords.CombinerClass.configure");
+ freqWords = new FreqWordCache(getFreqWordsCount(job));
+ intWritable = new IntWritable();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Item[] items = freqWords.toArray();
+ for (Item i : items) {
+ intWritable.set(i.count);
+ out.collect(intWritable, i.word);
+ }
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<BytesWritable> values,
+ OutputCollector<IntWritable, BytesWritable> output, Reporter reporter)
+ throws IOException {
+ if (out == null) {
+ out = output;
+ }
+ freqWords.add(values, key.get());
+ reporter.progress();
+ }
+ }
+
+ static class ReduceClass implements
+ Reducer<IntWritable, BytesWritable, BytesWritable, Tuple> {
+ FreqWordCache freqWords;
+ OutputCollector<BytesWritable, Tuple> out;
+ Tuple outRow;
+ int idxCount;
+
+ @Override
+ public void configure(JobConf job) {
+ LOG.info("FreqWords.ReduceClass.configure");
+ freqWords = new FreqWordCache(getFreqWordsCount(job));
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ outRow = TypesUtils.createTuple(outSchema);
+ idxCount = outSchema.getColumnIndex("count");
+ } catch (IOException e) {
+ throw new RuntimeException("Schema parsing failed : "
+ + e.getMessage());
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed : "
+ + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Item[] items = freqWords.toArray();
+ for (Item i : items) {
+ try {
+ outRow.set(idxCount, new Integer(i.count));
+ out.collect(i.word, outRow);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<BytesWritable> values,
+ OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ throws IOException {
+ if (out == null) {
+ out = output;
+ }
+ freqWords.add(values, key.get());
+ reporter.progress();
+ }
+ }
+ }
+
+ static class InverseIntRawComparator implements RawComparator<IntWritable> {
+ IntWritable.Comparator comparator;
+
+ InverseIntRawComparator() {
+ comparator = new IntWritable.Comparator();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return -comparator.compare(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(IntWritable o1, IntWritable o2) {
+ return -comparator.compare(o1, o2);
+ }
+ }
+
+ void runFreqWords() throws IOException, ParseException {
+ LOG.info("Find the most frequent words");
+ JobConf jobConf = getJobConf("runFreqWords");
+
+ // input-related settings
+ jobConf.setInputFormat(TableInputFormat.class);
+ jobConf.setMapperClass(FreqWords.MapClass.class);
+ TableInputFormat.setInputPaths(jobConf, invIndexTablePath);
+ TableInputFormat.setProjection(jobConf, "count");
+ TableInputFormat.setMinSplitSize(jobConf, options.minTableSplitSize);
+ // jobConf.setNumMapTasks(options.numMapper);
+ jobConf.setNumMapTasks(-1);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(BytesWritable.class);
+ // Set customized output comparator.
+ jobConf.setOutputKeyComparatorClass(InverseIntRawComparator.class);
+
+ // output related settings
+ fileSys.delete(freqWordTablePath, true);
+ jobConf.setOutputFormat(BasicTableOutputFormat.class);
+ jobConf.setReducerClass(FreqWords.ReduceClass.class);
+ jobConf.setCombinerClass(FreqWords.CombinerClass.class);
+ BasicTableOutputFormat.setOutputPath(jobConf, freqWordTablePath);
+ BasicTableOutputFormat.setSchema(jobConf, "count");
+ jobConf.setNumReduceTasks(1);
+
+ JobClient.runJob(jobConf);
+ }
+
+ void printFreqWords() throws IOException, ParseException {
+ LOG.info("Printing the most frequent words");
+ BasicTable.Reader reader = new BasicTable.Reader(freqWordTablePath, conf);
+ TableScanner scanner = reader.getScanner(null, true);
+ BytesWritable key = new BytesWritable();
+ Schema schema = Projection.toSchema(scanner.getProjection());
+ int idxCount = schema.getColumnIndex("count");
+ Tuple value = TypesUtils.createTuple(schema);
+ for (; !scanner.atEnd(); scanner.advance()) {
+ scanner.getKey(key);
+ scanner.getValue(value);
+ try {
+ String word = new String(key.get(), 0, key.getSize());
+ int count = (Integer) value.get(idxCount);
+ LOG.info(String.format("%s\t%d", word, count));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ scanner.close();
+ }
+
+ /**
+ * Testing BasicTableOutputFormat and TableInputFormat by running a sequence
+ * of MapReduce jobs.
+ *
+ * @throws IOException
+ */
+ public void testBasicTable() throws IOException, ParseException {
+ LOG.info("testBasicTable");
+ LOG.info("testing BasicTableOutputFormat in Map-only job");
+ for (int i = 0; i < options.numBatches; ++i) {
+ String batchName = batchName(i);
+ createSourceFiles(batchName);
+ runForwardIndexGen(batchName);
+ LOG.info("Forward index conversion complete: " + batchName);
+ Assert.assertEquals(summary.get(batchName).wordCount, countRows(new Path(
+ fwdIndexRootPath, batchName)));
+ }
+ runInvertedIndexGen();
+ verifyWordCount();
+ runFreqWords();
+ printFreqWords();
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestCheckin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestCheckin.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestCheckin.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestCheckin.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TestBasicTableIOFormatLocalFS.class,
+ TestBasicTableIOFormatDFS.class
+})
+
+public class TestCheckin {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}
+
+
+
+
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBasicTableUnionLoader {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathWorking, pathTable1, pathTable2;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ pathWorking = fs.getWorkingDirectory();
+
+ /*
+ * create 1st basic table;
+ */
+ pathTable1 = new Path(pathWorking, "TestBasicTableUnion" + "1");
+ System.out.println("pathTable1 =" + pathTable1);
+
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable1,
+ "a:string,b,c:string", "[a,b];[c]", false, conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 2;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+
+ /*
+ * create 2nd basic table;
+ */
+ pathTable2 = new Path(pathWorking, "TestBasicTableUnion" + "2");
+ System.out.println("pathTable2 =" + pathTable2);
+
+ writer = new BasicTable.Writer(pathTable2, "a:string,b,d:string",
+ "[a,b];[d]", false, conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ @Test
+ public void testReader() throws ExecException, IOException {
+ /*
+ * remove hdfs prefix part like "hdfs://localhost.localdomain:42540" pig
+ * will fill that in.
+ */
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');";
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int cnt = 0;
+ Tuple cur;
+ while (it.hasNext()) {
+ cur = it.next();
+ System.out.println(cur);
+ cnt++;
+ if (cnt == 1) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals("0_02", cur.get(2));
+ Assert.assertEquals(null, cur.get(3));
+ }
+ if (cnt == 21) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals(null, cur.get(2));
+ Assert.assertEquals("0_02", cur.get(3));
+ }
+ }
+ Assert.assertEquals(cnt, 40);
+ }
+
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,901 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestBasicUnion {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathWorking, pathTable1, pathTable2, pathTable3,
+ pathTable4, pathTable5;
+ private static Configuration conf;
+ final static String STR_SCHEMA1 = "a:string,b,c:string,e,f";
+ final static String STR_STORAGE1 = "[a];[c]";
+ final static String STR_SCHEMA2 = "a:string,b,d:string,f,e";
+ final static String STR_STORAGE2 = "[a,b];[d]";
+ final static String STR_SCHEMA3 = "b,a";
+ final static String STR_STORAGE3 = "[a];[b]";
+ final static String STR_SCHEMA4 = "b:string,a,c:string";
+ final static String STR_STORAGE4 = "[a,b];[c]";
+ final static String STR_SCHEMA5 = "b,a:string";
+ final static String STR_STORAGE5 = "[a,b]";
+
+ @BeforeClass
+ public static void setUpOnce() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ pathWorking = fs.getWorkingDirectory();
+
+ /*
+ * create 1st basic table;
+ */
+ pathTable1 = new Path(pathWorking, "1");
+ System.out.println("pathTable1 =" + pathTable1);
+
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable1, STR_SCHEMA1,
+ STR_STORAGE1, false, conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 2;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }// k
+ inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple);
+ }// i
+ }// b
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+
+ /*
+ * create 2nd basic table;
+ */
+ pathTable2 = new Path(pathWorking, "2");
+ System.out.println("pathTable2 =" + pathTable2);
+
+ writer = new BasicTable.Writer(pathTable2, STR_SCHEMA2, STR_STORAGE2,
+ false, conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+
+ /*
+ * create 3rd basic table;
+ */
+ pathTable3 = new Path(pathWorking, "3");
+ System.out.println("pathTable3 =" + pathTable3);
+
+ writer = new BasicTable.Writer(pathTable3, STR_SCHEMA3, STR_STORAGE3,
+ false, conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key3" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ /*
+ * create 4th basic table;
+ */
+ pathTable4 = new Path(pathWorking, "4");
+ System.out.println("pathTable4 =" + pathTable4);
+
+ writer = new BasicTable.Writer(pathTable4, STR_SCHEMA4, STR_STORAGE4,
+ false, conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key4" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ /*
+ * create 5th basic table;
+ */
+ pathTable5 = new Path(pathWorking, "5");
+ System.out.println("pathTable5 =" + pathTable5);
+
+ writer = new BasicTable.Writer(pathTable5, STR_SCHEMA5, STR_STORAGE5,
+ false, conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key5" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws Exception {
+ pigServer.shutdown();
+ }
+
+ // all fields
+ public void testReader1() throws ExecException, IOException {
+ /*
+ * remove hdfs prefix part like "hdfs://localhost.localdomain:42540" pig
+ * will fill that in.
+ */
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');";
+ System.out.println(query);
+ // records = LOAD '/user/jing1234/1,/user/jing1234/2' USING
+ // org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ // first table
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+ Assert.assertEquals(i + "_00", cur.get(0));
+ Assert.assertEquals(i + "_01", cur.get(1));
+ Assert.assertEquals(i + "_02", cur.get(2));
+ Assert.assertEquals(null, cur.get(3));
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_10", cur.get(0));
+ Assert.assertEquals(k + "_11", cur.get(1));
+ Assert.assertEquals(k + "_12", cur.get(2));
+ Assert.assertEquals(null, cur.get(3));
+ }
+
+ // second table
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(t + "_00", cur.get(0));
+ Assert.assertEquals(t + "_01", cur.get(1));
+ Assert.assertEquals(null, cur.get(2));
+ Assert.assertEquals(t + "_02", cur.get(3));
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(j + "_10", cur.get(0));
+ Assert.assertEquals(j + "_11", cur.get(1));
+ Assert.assertEquals(null, cur.get(2));
+ Assert.assertEquals(j + "_12", cur.get(3));
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ public void testReaderThroughIO() throws ExecException, IOException,
+ ParseException {
+
+ String projection1 = new String("a,b,c");
+ BasicTable.Reader reader = new BasicTable.Reader(pathTable1, conf);
+ reader.setProjection(projection1);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+
+ System.out.println("read record or record:" + RowValue.toString());
+
+ for (int i = 0; i <= 9; i++) {
+ scanner.getValue(RowValue);
+ System.out.println("read record or record:" + RowValue.toString());
+ Assert.assertEquals(i + "_00", RowValue.get(0));
+ Assert.assertEquals(i + "_01", RowValue.get(1));
+ Assert.assertEquals(i + "_02", RowValue.get(2));
+ scanner.advance();
+ }
+ for (int i = 0; i <= 9; i++) {
+ scanner.getValue(RowValue);
+ System.out.println("read record or record:" + RowValue.toString());
+ Assert.assertEquals(i + "_10", RowValue.get(0));
+ Assert.assertEquals(i + "_11", RowValue.get(1));
+ Assert.assertEquals(i + "_12", RowValue.get(2));
+ scanner.advance();
+ }
+
+ reader.close();
+ }
+
+ // all fields
+ public void testReader2() throws ExecException, IOException {
+
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('c');";
+ System.out.println(query);
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+
+ Assert.assertEquals(i + "_02", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_12", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(null, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+
+ Assert.assertEquals(null, cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ // projection for common exist colum a
+ public void testReader3() throws ExecException, IOException {
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a');";
+ System.out.println(query);
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ // first table
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+ Assert.assertEquals(i + "_00", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_10", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // second table
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(t + "_00", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(j + "_10", cur.get(0));
+ try {
+ cur.get(1);
+ Assert.fail("should throw index out of bound exception ");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ // some common fields
+ public void testReader4() throws ExecException, IOException {
+
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b');";
+ System.out.println(query);
+ // records = LOAD '/user/jing1234/1,/user/jing1234/2' USING
+ // org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ // first table
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+ Assert.assertEquals(i + "_00", cur.get(0));
+ Assert.assertEquals(i + "_01", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_10", cur.get(0));
+ Assert.assertEquals(k + "_11", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // second table
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(t + "_00", cur.get(0));
+ Assert.assertEquals(t + "_01", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(j + "_10", cur.get(0));
+ Assert.assertEquals(j + "_11", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ // common column, but different posion
+ public void testReader5() throws ExecException, IOException {
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('e, f');";
+ System.out.println(query);
+ // records = LOAD '/user/jing1234/1,/user/jing1234/2' USING
+ // org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ // first table
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+ Assert.assertEquals(i + "_03", cur.get(0));
+ Assert.assertEquals(i + "_04", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw out of index bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_13", cur.get(0));
+ Assert.assertEquals(k + "_14", cur.get(1));
+
+ }
+
+ // second table
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(t + "_04", cur.get(0));
+ Assert.assertEquals(t + "_03", cur.get(1));
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(j + "_14", cur.get(0));
+ Assert.assertEquals(j + "_13", cur.get(1));
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ @Test
+ // union two tables with different column numbers and column positions
+ public void testReader6() throws ExecException, IOException {
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str5 = pathTable2.toString().substring(
+ pathTable5.toString().indexOf("/", 7), pathTable5.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str5
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('b,a');";
+ System.out.println(query);
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int i = 0;
+ int k = -1;
+ Tuple cur = null;
+ int t = -1;
+ int j = -1;
+
+ while (it.hasNext()) {
+ cur = it.next();
+
+ System.out.println("cur: " + cur);
+ // first table
+ if (i <= 9) {
+ System.out.println("first table first part: " + cur.toString());
+ Assert.assertEquals(i + "_01", cur.get(0));
+ Assert.assertEquals(i + "_00", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ if (i >= 10) {
+ k++;
+ }
+ if (k <= 9 && k >= 0) {
+ System.out.println("first table second part: : " + cur.toString());
+ Assert.assertEquals(k + "_11", cur.get(0));
+ Assert.assertEquals(k + "_10", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ // second table
+ if (k >= 10) {
+ t++;
+ }
+ if (t <= 9 && t >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(t + "_01", cur.get(0));
+ Assert.assertEquals(t + "_00", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ if (t >= 10) {
+ j++;
+ }
+ if (j <= 9 && j >= 0) {
+ System.out.println("second table first part: " + cur.toString());
+ Assert.assertEquals(j + "_11", cur.get(0));
+ Assert.assertEquals(j + "_10", cur.get(1));
+ try {
+ cur.get(2);
+ Assert.fail("should throw index out of bound exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ i++;
+ }// while
+ Assert.assertEquals(40, i);
+ }
+
+ // both paths is hdfs://...
+ public void testNeg1() throws ExecException, IOException {
+ String str1 = pathTable1.toString();
+ String str2 = pathTable2.toString();
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,b,c,d');";
+ System.out.println(query);
+ // records = LOAD
+ // 'hdfs://localhost.localdomain:39125/user/jing1234/1,hdfs://localhost.localdomain:39125/user/jing1234/2'
+ // USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int cnt = 0;
+ Tuple cur = it.next();
+ cnt++;
+ while (it.hasNext()) {
+ cur = it.next();
+ System.out.println(cur);
+ cnt++;
+ if (cnt == 1) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals("0_02", cur.get(2));
+ Assert.assertEquals(null, cur.get(3));
+ }
+ if (cnt == 21) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals(null, cur.get(2));
+ Assert.assertEquals("0_02", cur.get(3));
+ }
+ }
+ Assert.assertEquals(cnt, 40);
+ }
+
+ // non-existing column
+ public void testNeg2() throws ExecException, IOException {
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str2 = pathTable2.toString().substring(
+ pathTable2.toString().indexOf("/", 7), pathTable2.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,f');";
+
+ System.out.println(query);
+ // records = LOAD
+ // 'hdfs://localhost.localdomain:39125/user/jing1234/1,hdfs://localhost.localdomain:39125/user/jing1234/2'
+ // USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d');
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+
+ int cnt = 0;
+ Tuple cur = it.next();
+ cnt++;
+ while (it.hasNext()) {
+ cur = it.next();
+ System.out.println(cur);
+ cnt++;
+ if (cnt == 1) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals("0_02", cur.get(2));
+ Assert.assertEquals(null, cur.get(3));
+ }
+ if (cnt == 21) {
+ Assert.assertEquals("0_00", cur.get(0));
+ Assert.assertEquals("0_01", cur.get(1));
+ Assert.assertEquals(null, cur.get(2));
+ Assert.assertEquals("0_02", cur.get(3));
+ }
+ }
+ Assert.assertEquals(cnt, 40);
+ }
+
+ @Test
+ // 2 table with same column name but different type and different position.
+ // should throw exception
+ public void testNeg3() throws ExecException, IOException {
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str3 = pathTable3.toString().substring(
+ pathTable3.toString().indexOf("/", 7), pathTable3.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str3
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b');";
+ System.out.println(query);
+ // records = LOAD '/user/jing1234/1,/user/jing1234/3' USING
+ // org.apache.hadoop.zebra.pig.TableLoader('a,b');
+ try {
+ pigServer.registerQuery(query);
+ Assert.fail("should throw exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ // union table1 and table4. they have same culumn name with differnt types ,
+ // should throw excepiton in union
+ public void testNeg4() throws ExecException, IOException {
+
+ String str1 = pathTable1.toString().substring(
+ pathTable1.toString().indexOf("/", 7), pathTable1.toString().length());
+ String str4 = pathTable4.toString().substring(
+ pathTable4.toString().indexOf("/", 7), pathTable4.toString().length());
+ String query = "records = LOAD '" + str1 + "," + str4
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c');";
+ System.out.println(query);
+ try {
+ pigServer.registerQuery(query);
+ Assert.fail("should throw exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin1.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin1.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin1.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TestBasicUnion.class
+ //TestMapTableLoader.class
+ //TestMapTableStorer.class,
+ //TestTableLoader.class,
+ //TestTableStorer.class
+})
+
+public class TestCheckin1 {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin2.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin2.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin2.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //TestBasicUnion.class,
+ TestMapTableLoader.class
+ //TestMapTableStorer.class,
+ //TestTableLoader.class,
+ //TestTableStorer.class
+})
+
+public class TestCheckin2 {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin3.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin3.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin3.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //TestBasicUnion.class,
+ //TestMapTableLoader.class
+ TestMapTableStorer.class
+ //TestTableLoader.class,
+ //TestTableStorer.class
+})
+
+public class TestCheckin3 {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin4.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin4.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin4.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //TestBasicUnion.class,
+ //TestMapTableLoader.class
+ //TestMapTableStorer.class,
+ TestTableLoader.class
+ //TestTableStorer.class
+})
+
+public class TestCheckin4 {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin5.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin5.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin5.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCheckin5.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //TestBasicUnion.class,
+ //TestMapTableLoader.class
+ //TestMapTableStorer.class,
+ //TestTableLoader.class,
+ TestTableStorer.class
+})
+
+public class TestCheckin5 {
+ // the class remains completely empty,
+ // being used only as a holder for the above annotations
+}