You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/17 01:31:45 UTC
svn commit: r910786 - in
/hadoop/pig/branches/load-store-redesign/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/test/org/apache/hadoop/zebra/io/
src/test/org/apache/hadoop/zebra/mapreduce/
Author: yanz
Date: Wed Feb 17 00:31:44 2010
New Revision: 910786
URL: http://svn.apache.org/viewvc?rev=910786&view=rev
Log:
PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)
Added:
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java
Modified:
hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Wed Feb 17 00:31:44 2010
@@ -54,6 +54,8 @@
BUG FIXES
+ PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)
+
PIG-1167: Hadoop file glob support (yanz)
PIG-1153: Record split exception fix (yanz)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Wed Feb 17 00:31:44 2010
@@ -1225,6 +1225,10 @@
boolean sorted;
private boolean finished;
Tuple[] cgTuples;
+ private Path actualOutputPath;
+ private Configuration writerConf;
+
+
/**
* Create a BasicTable writer. The semantics are as follows:
@@ -1262,6 +1266,8 @@
public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
String comparator, Configuration conf) throws IOException {
try {
+ actualOutputPath = path;
+ writerConf = conf;
schemaFile =
new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
comparator, conf);
@@ -1337,15 +1343,20 @@
*/
public Writer(Path path, Configuration conf) throws IOException {
try {
+ actualOutputPath = path;
+ writerConf = conf;
schemaFile = new SchemaFile(path, conf);
int numCGs = schemaFile.getNumOfPhysicalSchemas();
partition = schemaFile.getPartition();
sorted = schemaFile.isSorted();
colGroups = new ColumnGroup.Writer[numCGs];
cgTuples = new Tuple[numCGs];
+ Path tmpWorkPath = new Path(path, "_temporary");
for (int nx = 0; nx < numCGs; nx++) {
colGroups[nx] =
- new ColumnGroup.Writer(new Path(path, partition.getCGSchema(nx).getName()),
+ new ColumnGroup.Writer(
+ new Path(path, partition.getCGSchema(nx).getName()),
+ new Path(tmpWorkPath, partition.getCGSchema(nx).getName()),
conf);
cgTuples[nx] = TypesUtils.createTuple(colGroups[nx].getSchema());
}
@@ -1420,6 +1431,7 @@
*/
@Override
public void close() throws IOException {
+ cleanupTempDir();
if (closed) return;
closed = true;
if (!finished)
@@ -1449,6 +1461,23 @@
}
}
}
+
+ /**
+ * Removes the temporary directory underneath
+ * $path/_temporary used to create intermediate data
+ * during recrd writing
+ */
+
+ private void cleanupTempDir() throws IOException {
+ FileSystem fileSys = actualOutputPath.getFileSystem(writerConf);
+ Path pathToRemove = new Path(actualOutputPath, "_temporary");
+ if (fileSys.exists(pathToRemove)) {
+ if(!fileSys.delete(pathToRemove, true)) {
+ LOG.error("Failed to delete the temporary output" +
+ " directory: " + pathToRemove.toString());
+ }
+ }
+ }
/**
* Get the schema of the table.
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Wed Feb 17 00:31:44 2010
@@ -1280,6 +1280,7 @@
*/
public static class Writer implements Closeable {
Path path;
+ Path finalOutputPath;
Configuration conf;
FileSystem fs;
CGSchema cgschema;
@@ -1350,6 +1351,7 @@
throws IOException, ParseException {
this.path = path;
this.conf = conf;
+ this.finalOutputPath = path;
fs = path.getFileSystem(conf);
@@ -1379,6 +1381,25 @@
}
/**
+ * Reopen an already created ColumnGroup for writing. It accepts
+ * a temporary path for column group where cginserter can write.
+ * RuntimeException will be thrown if the table is already closed,
+ * or if createMetaBlock() is called by some other process.
+ */
+ public Writer(Path finalPath, Path workPath, Configuration conf) throws IOException,
+ ParseException {
+ this.path = workPath;
+ finalOutputPath = finalPath;
+ this.conf = conf;
+ fs = path.getFileSystem(conf);
+ checkPath(finalOutputPath, false);
+ checkPath(path, true);
+ checkMetaFile(finalOutputPath);
+ cgschema = CGSchema.load(fs, finalOutputPath);
+
+ }
+
+ /**
* Reopen an already created ColumnGroup for writing. RuntimeException will
* be thrown if the table is already closed, or if createMetaBlock() is
* called by some other process.
@@ -1386,6 +1407,7 @@
public Writer(Path path, Configuration conf) throws IOException,
ParseException {
this.path = path;
+ finalOutputPath = path;
this.conf = conf;
fs = path.getFileSystem(conf);
checkPath(path, false);
@@ -1444,9 +1466,9 @@
private void createIndex() throws IOException {
MetaFile.Writer metaFile =
- MetaFile.createWriter(makeMetaFilePath(path), conf);
+ MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
if (cgschema.isSorted()) {
- CGIndex index = buildIndex(fs, path, false, conf);
+ CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
try {
index.write(dos);
@@ -1629,7 +1651,9 @@
out.close();
out = null;
// do renaming only if all the above is successful.
- fs.rename(new Path(path, tmpName), new Path(path, name));
+// fs.rename(new Path(path, tmpName), new Path(path, name));
+ fs.rename(new Path(path, tmpName), new Path(finalOutputPath, name));
+
/*
if(cgschema.getOwner() != null || cgschema.getGroup() != null) {
fs.setOwner(new Path(path, name), cgschema.getOwner(), cgschema.getGroup());
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=910786&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Wed Feb 17 00:31:44 2010
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing ColumnGroup APIs called as if in MapReduce Jobs
+ */
+public class TestColumnGroupWithWorkPath {
+ static Configuration conf;
+ static Random random;
+ static Path rootPath;
+ static FileSystem fs;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ conf = new Configuration();
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+ conf.set("io.compression.codec.lzo.class", "no");
+ random = new Random(System.nanoTime());
+ rootPath = new Path(System.getProperty("test.build.data",
+ "build/test/data/workdir3"));
+ fs = rootPath.getFileSystem(conf);
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ }
+
+ BytesWritable makeRandomKey(int max) {
+ return makeKey(random.nextInt(max));
+ }
+
+ static BytesWritable makeKey(int i) {
+ return new BytesWritable(String.format("key%09d", i).getBytes());
+ }
+
+ String makeString(String prefix, int max) {
+ return String.format("%s%09d", prefix, random.nextInt(max));
+ }
+
+ int createCG(int parts, int rows, String strSchema, Path path,
+ boolean properClose, boolean sorted, int[] emptyTFiles)
+ throws IOException, ParseException {
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
+
+ Set<Integer> emptyTFileSet = new HashSet<Integer>();
+ if (emptyTFiles != null) {
+ for (int i = 0; i < emptyTFiles.length; ++i) {
+ emptyTFileSet.add(emptyTFiles[i]);
+ }
+ }
+
+ ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, sorted, path.getName(),
+ "pig", "gz", "root", null, (short) Short.parseShort("755", 8), false, conf);
+
+ writer.finish();
+
+ int total = 0;
+ Schema schema = new Schema(strSchema);
+ String colNames[] = schema.getColumns();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ int[] permutation = new int[parts];
+ for (int i = 0; i < parts; ++i) {
+ permutation[i] = i;
+ }
+
+ for (int i = parts - 1; i > 0; --i) {
+ int targetIndex = random.nextInt(i + 1);
+ int tmp = permutation[i];
+ permutation[i] = permutation[targetIndex];
+ permutation[targetIndex] = tmp;
+ }
+
+ for (int i = 0; i < parts; ++i) {
+ Path workPath = new Path(path.getParent(), "_temporary");
+ writer = new ColumnGroup.Writer(path, workPath, conf);
+ TableInserter inserter = writer.getInserter(String.format("part-%06d",
+ permutation[i]), true);
+ if ((rows > 0) && !emptyTFileSet.contains(permutation[i])) {
+ int actualRows = random.nextInt(rows) + rows / 2;
+ for (int j = 0; j < actualRows; ++j, ++total) {
+ BytesWritable key;
+ if (!sorted) {
+ key = makeRandomKey(rows * 10);
+ } else {
+ key = makeKey(total);
+ }
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserter.insert(key, tuple);
+ }
+ }
+ inserter.close();
+ }
+
+ if (properClose) {
+ writer = new ColumnGroup.Writer(path, conf);
+ writer.close();
+ /* We can only test number of rows on sorted tables.*/
+ if (sorted) {
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+ }
+ }
+
+ return total;
+ }
+
+ static class DupKeyGen {
+ int low, high;
+ int current;
+ boolean grow = true;
+ int index = 0;
+ int count = 0;
+
+ DupKeyGen(int low, int high) {
+ this.low = Math.max(10, low);
+ this.high = Math.max(this.low * 2, high);
+ current = this.low;
+ }
+
+ BytesWritable next() {
+ if (count == 0) {
+ count = nextCount();
+ ++index;
+ }
+ --count;
+ return makeKey(index);
+ }
+
+ int nextCount() {
+ int ret = current;
+ if ((grow && current > high) || (!grow && current < low)) {
+ grow = !grow;
+ }
+ if (grow) {
+ current *= 2;
+ } else {
+ current /= 2;
+ }
+ return ret;
+ }
+ }
+
+ int createCGDupKeys(int parts, int rows, String strSchema, Path path)
+ throws IOException, ParseException {
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
+
+ ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, true, path.getName(),
+ "pig", "gz", "root", null, (short) Short.parseShort("777", 8), false, conf);
+ writer.finish();
+
+ int total = 0;
+ DupKeyGen keyGen = new DupKeyGen(10, rows * 3);
+ Schema schema = new Schema(strSchema);
+ String colNames[] = schema.getColumns();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ int[] permutation = new int[parts];
+ for (int i = 0; i < parts; ++i) {
+ permutation[i] = i;
+ }
+
+ for (int i = parts - 1; i > 0; --i) {
+ int targetIndex = random.nextInt(i + 1);
+ int tmp = permutation[i];
+ permutation[i] = permutation[targetIndex];
+ permutation[targetIndex] = tmp;
+ }
+
+ for (int i = 0; i < parts; ++i) {
+ writer = new ColumnGroup.Writer(path, conf);
+ TableInserter inserter = writer.getInserter(String.format("part-%06d",
+ permutation[i]), true);
+ if (rows > 0) {
+ int actualRows = random.nextInt(rows * 2 / 3) + rows * 2 / 3;
+ for (int j = 0; j < actualRows; ++j, ++total) {
+ BytesWritable key = keyGen.next();
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserter.insert(key, tuple);
+ }
+ }
+ inserter.close();
+ }
+
+ writer = new ColumnGroup.Writer(path, conf);
+ writer.close();
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+
+ return total;
+ }
+
+ void rangeSplitCG(int numSplits, int totalRows, String strProjection,
+ Path path) throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ long totalBytes = reader.getStatus().getSize();
+
+ List<CGRangeSplit> splits = reader.rangeSplit(numSplits);
+ reader.close();
+ int total = 0;
+ for (int i = 0; i < splits.size(); ++i) {
+ reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ total += doReadOnly(reader.getScanner(splits.get(i), true));
+ totalBytes -= reader.getBlockDistribution(splits.get(i)).getLength();
+ }
+ Assert.assertEquals(total, totalRows);
+ Assert.assertEquals(totalBytes, 0L);
+ }
+
+ void doRangeSplit(int[] numSplits, int totalRows, String projection, Path path)
+ throws IOException, ParseException {
+ for (int i : numSplits) {
+ if (i > 0) {
+ rangeSplitCG(i, totalRows, projection, path);
+ }
+ }
+ }
+
+ void keySplitCG(int numSplits, int totalRows, String strProjection, Path path)
+ throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ long totalBytes = reader.getStatus().getSize();
+ KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
+ Assert.assertEquals(totalBytes, keyDistri.length());
+ reader.close();
+ BytesWritable[] keys = null;
+ if (keyDistri.size() >= numSplits) {
+ keyDistri.resize(numSplits);
+ Assert.assertEquals(totalBytes, keyDistri.length());
+ RawComparable[] rawComparables = keyDistri.getKeys();
+ keys = new BytesWritable[rawComparables.length];
+ for (int i = 0; i < keys.length; ++i) {
+ keys[i] = new BytesWritable();
+ keys[i].setSize(rawComparables[i].size());
+ System.arraycopy(rawComparables[i].buffer(),
+ rawComparables[i].offset(), keys[i].get(), 0, rawComparables[i]
+ .size());
+ }
+ } else {
+ int targetSize = Math.min(totalRows / 10, numSplits);
+ // revert to manually cooked up keys.
+ Set<Integer> keySets = new TreeSet<Integer>();
+ while (keySets.size() < targetSize) {
+ keySets.add(random.nextInt(totalRows));
+ }
+ keys = new BytesWritable[targetSize];
+ if (!keySets.isEmpty()) {
+ int j = 0;
+ for (int i : keySets.toArray(new Integer[keySets.size()])) {
+ keys[j] = makeKey(i);
+ ++j;
+ }
+ }
+ }
+
+ int total = 0;
+ for (int i = 0; i < keys.length; ++i) {
+ reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection(strProjection);
+ BytesWritable begin = (i == 0) ? null : keys[i - 1];
+ BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+ total += doReadOnly(reader.getScanner(begin, end, true));
+ }
+ Assert.assertEquals(total, totalRows);
+ }
+
+ void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+ throws IOException, ParseException {
+ for (int i : numSplits) {
+ if (i > 0) {
+ keySplitCG(i, totalRows, projection, path);
+ }
+ }
+ }
+
+ BasicTableStatus getStatus(Path path) throws IOException, ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ try {
+ return reader.getStatus();
+ } finally {
+ reader.close();
+ }
+ }
+
+ void doReadWrite(Path path, int parts, int rows, String schema,
+ String projection, boolean properClose, boolean sorted, int[] emptyTFiles)
+ throws IOException, ParseException {
+ int totalRows = createCG(parts, rows, schema, path, properClose, sorted,
+ emptyTFiles);
+ if (rows == 0) {
+ Assert.assertEquals(rows, 0);
+ }
+
+ doRangeSplit(new int[] { 1, 2, parts / 2, parts, 2 * parts }, totalRows,
+ projection, path);
+ if (sorted) {
+ doKeySplit(new int[] { 1, 2, parts / 2, parts, 2 * parts, 10 * parts },
+ totalRows, projection, path);
+ }
+ }
+
+ int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+ int total = 0;
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+ for (; !scanner.atEnd(); scanner.advance()) {
+ ++total;
+ switch (random.nextInt() % 4) {
+ case 0:
+ scanner.getKey(key);
+ break;
+ case 1:
+ scanner.getValue(value);
+ break;
+ case 2:
+ scanner.getKey(key);
+ scanner.getValue(value);
+ break;
+ default: // no-op.
+ }
+ }
+ scanner.close();
+
+ return total;
+ }
+
+ @Test
+ public void testNullSplits() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupNullSplits");
+ int totalRows = createCG(2, 10, "a, b, c", path, true, true, null);
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ reader.setProjection("a,d,c,f");
+ Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
+ Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, null,
+ false)));
+ reader.close();
+ }
+
+ @Test
+ public void testNegativeSplits() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestNegativeSplits");
+ int totalRows = createCG(2, 100, "a, b, c", path, true, true, null);
+ rangeSplitCG(-1, totalRows, "a,d,c,f", path);
+ }
+
+ @Test
+ public void testEmptyCG() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ @Test
+ public void testEmptyTFiles() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ public void testNormalCases() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupNormal");
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
+ doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
+ }
+
+ @Test
+ public void testSomeEmptyTFiles() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
+ for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
+ emptyTFiles);
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
+ emptyTFiles);
+ doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
+ emptyTFiles);
+ }
+ }
+
+ int countRows(Path path, String projection) throws IOException,
+ ParseException {
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+ if (projection != null) {
+ reader.setProjection(projection);
+ }
+ int totalRows = 0;
+ TableScanner scanner = reader.getScanner(null, true);
+ for (; !scanner.atEnd(); scanner.advance()) {
+ ++totalRows;
+ }
+ scanner.close();
+ return totalRows;
+ }
+
+ @Test
+ public void testProjection() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupProjection");
+ int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+ Assert.assertEquals(totalRows, countRows(path, null));
+ Assert.assertEquals(totalRows, countRows(path, ""));
+ }
+
+ @Test
+ public void testDuplicateKeys() throws IOException, ParseException {
+ Path path = new Path(rootPath, "TestColumnGroupDuplicateKeys");
+ int totalRows = createCGDupKeys(2, 250, "a, b, c", path);
+ doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+ path);
+ }
+
+ @Test
+ public void testSortedCGKeySplit() throws IOException, ParseException {
+ conf.setInt("table.output.tfile.minBlock.size", 640 * 1024);
+ Path path = new Path(rootPath, "TestSortedCGKeySplit");
+ int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+ doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+ path);
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java?rev=910786&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java Wed Feb 17 00:31:44 2010
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ *
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ *
+ *
+ */
+public class TestTempDirRemoval {
+
+ static String inputPath;
+ static String inputFileName = "multi-input.txt";
+ protected static ExecType execType = ExecType.LOCAL;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ // private static Path pathWorking, pathTable1, path2, path3,
+ // pathTable4, pathTable5;
+ private static Configuration conf;
+ public static String sortKey = null;
+
+ private static FileSystem fs;
+
+ private static String zebraJar;
+ private static String whichCluster;
+ private static String multiLocs;
+ private static String strTable1 = null;
+ private static String strTable2 = null;
+ private static String strTable3 = null;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ if (System.getenv("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (System.getProperty("whichCluster") == null) {
+ System.setProperty("whichCluster", "miniCluster");
+ System.out.println("should be called");
+ whichCluster = System.getProperty("whichCluster");
+ } else {
+ whichCluster = System.getProperty("whichCluster");
+ }
+
+ System.out.println("clusterddddd: " + whichCluster);
+ System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+ System.out.println(" get env user name: " + System.getenv("USER"));
+ if ((whichCluster.equalsIgnoreCase("realCluster") && System
+ .getenv("HADOOP_HOME") == null)) {
+ System.out.println("Please set HADOOP_HOME");
+ System.exit(0);
+ }
+
+ conf = new Configuration();
+
+ if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
+ System.out.println("Please set USER");
+ System.exit(0);
+ }
+ zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+ File file = new File(zebraJar);
+ if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+ System.out.println("Please put zebra.jar at hadoop_home/lib");
+ System.exit(0);
+ }
+
+ // set inputPath and output path
+ String workingDir = null;
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ inputPath = new String("/user/" + System.getenv("USER") + "/"
+ + inputFileName);
+ System.out.println("inputPath: " + inputPath);
+ multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+ + "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
+ + "/user/" + System.getenv("USER") + "/" + "japan");
+ fs = new Path(inputPath).getFileSystem(conf);
+
+ } else {
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+ inputPath = new String(workingDir + "/" + inputFileName);
+ System.out.println("inputPath: " + inputPath);
+ multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
+ + "india" + "," + workingDir + "/" + "japan");
+ }
+ writeToFile(inputPath);
+ // check inputPath existence
+ File inputFile = new File(inputPath);
+ if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+ System.out.println("Please put inputFile in hdfs: " + inputPath);
+ // System.exit(0);
+ }
+ if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) {
+ System.out
+ .println("Please put inputFile under workingdir. working dir is : "
+ + workingDir);
+ System.exit(0);
+ }
+
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+ .toProperties(conf));
+ pigServer.registerJar(zebraJar);
+
+ }
+
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ fs = cluster.getFileSystem();
+
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+ }
+ }
+
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ StringTokenizer st = new StringTokenizer(t, ".");
+ String methodName = null;
+ while (st.hasMoreTokens()) {
+ methodName = st.nextToken();
+ }
+ return methodName;
+ }
+
+ public Path generateOutPath(String currentMethod) {
+ Path outPath = null;
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
+ + currentMethod);
+ } else {
+ String workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+ outPath = new Path(workingDir + "/multiOutput/" + currentMethod);
+ System.out.println("output file: " + outPath.toString());
+ }
+ return outPath;
+ }
+
+ public void removeDir(Path outPath) throws IOException {
+ String command = null;
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+ + outPath.toString();
+ } else {
+ StringTokenizer st = new StringTokenizer(outPath.toString(), ":");
+ int count = 0;
+ String file = null;
+ while (st.hasMoreElements()) {
+ count++;
+ String token = st.nextElement().toString();
+ if (count == 2)
+ file = token;
+ }
+ command = "rm -rf " + file;
+ }
+ Runtime runtime = Runtime.getRuntime();
+ Process proc = runtime.exec(command);
+ int exitVal = -1;
+ try {
+ exitVal = proc.waitFor();
+ } catch (InterruptedException e) {
+ System.err.println(e);
+ }
+
+ }
+
+ public static void writeToFile(String inputFile) throws IOException {
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ FileWriter fstream = new FileWriter(inputFile);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write("us 2\n");
+ out.write("japan 2\n");
+ out.write("india 4\n");
+ out.write("us 2\n");
+ out.write("japan 1\n");
+ out.write("india 3\n");
+ out.write("nouse 5\n");
+ out.write("nowhere 4\n");
+ out.close();
+ }
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ FSDataOutputStream fout = fs.create(new Path(inputFile));
+ fout.writeBytes("us 2\n");
+ fout.writeBytes("japan 2\n");
+ fout.writeBytes("india 4\n");
+ fout.writeBytes("us 2\n");
+ fout.writeBytes("japan 1\n");
+ fout.writeBytes("india 3\n");
+ fout.writeBytes("nouse 5\n");
+ fout.writeBytes("nowhere 4\n");
+ fout.close();
+ }
+ }
+
+ public static void getTablePaths(String myMultiLocs) {
+ StringTokenizer st = new StringTokenizer(myMultiLocs, ",");
+
+ // get how many tokens inside st object
+ System.out.println("tokens count: " + st.countTokens());
+ int count = 0;
+
+ // iterate st object to get more tokens from it
+ while (st.hasMoreElements()) {
+ count++;
+ String token = st.nextElement().toString();
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ System.out.println("in mini, token: " + token);
+ // in mini, token:
+ // file:/homes/<uid>/grid/multipleoutput/pig-table/contrib/zebra/ustest3
+ if (count == 1)
+ strTable1 = token;
+ if (count == 2)
+ strTable2 = token;
+ if (count == 3)
+ strTable3 = token;
+ }
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ System.out.println("in real, token: " + token);
+ // in real, token: /user/hadoopqa/ustest3
+ // note: no prefix file: in real cluster
+ if (count == 1)
+ strTable1 = token;
+ if (count == 2)
+ strTable2 = token;
+ if (count == 3)
+ strTable3 = token;
+ }
+
+ }
+ }
+
+ public static void checkTableExists(boolean expected, String strDir)
+ throws IOException {
+
+ File theDir = null;
+ boolean actual = false;
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ theDir = new File(strDir.split(":")[1]);
+ actual = theDir.exists();
+
+ }
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ theDir = new File(strDir.split(":")[0]);
+ actual = fs.exists(new Path(theDir.toString()));
+ }
+ System.out.println("the dir : " + theDir.toString());
+
+ if (actual != expected) {
+ Assert.fail("dir exists or not is different from what expected.");
+ }
+ }
+
+ public static void checkTable(String myMultiLocs) throws IOException {
+ System.out.println("myMultiLocs:" + myMultiLocs);
+ System.out.println("sorgetTablePathst key:" + sortKey);
+
+ getTablePaths(myMultiLocs);
+ String query1 = null;
+ String query2 = null;
+
+ if (strTable1 != null) {
+
+ query1 = "records1 = LOAD '" + strTable1
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ }
+ if (strTable2 != null) {
+ query2 = "records2 = LOAD '" + strTable2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ }
+
+ int count1 = 0;
+ int count2 = 0;
+
+ if (query1 != null) {
+ System.out.println(query1);
+ pigServer.registerQuery(query1);
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ while (it.hasNext()) {
+ count1++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ // test 1 us table
+ if (query1.contains("test1") || query1.contains("test2")
+ || query1.contains("test3")) {
+
+ if (count1 == 1) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ if (count1 == 2) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ } // test1, test2
+
+ }// while
+ if (query1.contains("test1") || query1.contains("test2")
+ || query1.contains("test3")) {
+ Assert.assertEquals(2, count1);
+ }
+ }// if query1 != null
+
+ if (query2 != null) {
+ pigServer.registerQuery(query2);
+ Iterator<Tuple> it = pigServer.openIterator("records2");
+
+ while (it.hasNext()) {
+ count2++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+
+ // if test1 other table
+ if (query2.contains("test1")) {
+ if (count2 == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ if (count2 == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ if (count2 == 3) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+ if (count2 == 4) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+
+ if (count1 == 5) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ }
+ if (count1 == 6) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ }// if test1
+ // if test2 other table
+ if (query2.contains("test2")) {
+ if (count2 == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ if (count2 == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ if (count2 == 3) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ if (count2 == 4) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+
+ if (count1 == 5) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ }
+ if (count1 == 6) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ }// if test2
+ // if test3 other table
+ if (query2.contains("test3")) {
+ if (count2 == 1) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+ if (count2 == 2) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ if (count2 == 3) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ if (count2 == 4) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ if (count1 == 5) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ if (count1 == 6) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ }
+
+ }// if test3
+
+ }// while
+ if (query2.contains("test1") || query2.contains("test2")
+ || query2.contains("test3")) {
+ Assert.assertEquals(6, count2);
+ }
+ }// if query2 != null
+
+ }
+
+
+ @Test
+ public void test4() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ /*
+ * path list have repeat element, for example atest1 dir has been already
+ * created. should throw IOExcepiton. complaining atest4/CG0/.meta already
+ * exists
+ */
+ System.out.println("******Start testcase: " + getCurrentMethodName());
+ sortKey = "word,count";
+ System.out.println("hello sort on word and count");
+ String methodName = getCurrentMethodName();
+ String myMultiLocs = null;
+ List<Path> paths = new ArrayList<Path>(3);
+
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "a"
+ + methodName + "," + "/user/" + System.getenv("USER") + "/" + "b"
+ + methodName);
+
+ paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+ + "a" + methodName)));
+ paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+ + "b" + methodName)));
+ } else {
+ RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+ fs = new LocalFileSystem(rawLFS);
+ myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "a"
+ + methodName + "," + fs.getWorkingDirectory() + "/" + "b"
+ + methodName);
+ paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "a"
+ + methodName)));
+ paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "b"
+ + methodName)));
+ }
+ getTablePaths(myMultiLocs);
+ removeDir(new Path(strTable1));
+ removeDir(new Path(strTable2));
+ runMR(sortKey, paths.toArray(new Path[2]));
+
+ }
+
+ static class MapClass extends
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+ private Object javaObj;
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ // value should contain "word count"
+ String[] wdct = value.toString().split(" ");
+ if (wdct.length != 2) {
+ // LOG the error
+ return;
+ }
+
+ byte[] word = wdct[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ System.out.println("word: " + new String(word));
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wdct[1]));
+ System.out.println("count: " + Integer.parseInt(wdct[1]));
+
+ // This key has to be created by user
+ /*
+ * Tuple userKey = new DefaultTuple(); userKey.append(new String(word));
+ * userKey.append(Integer.parseInt(wdct[1]));
+ */
+ System.out.println("in map, sortkey: " + sortKey);
+ Tuple userKey = new DefaultTuple();
+ if (sortKey.equalsIgnoreCase("word,count")) {
+ userKey.append(new String(word));
+ userKey.append(Integer.parseInt(wdct[1]));
+ }
+
+ if (sortKey.equalsIgnoreCase("count")) {
+ userKey.append(Integer.parseInt(wdct[1]));
+ }
+
+ if (sortKey.equalsIgnoreCase("word")) {
+ userKey.append(new String(word));
+ }
+
+ try {
+
+ /* New M/R Interface */
+ /* Converts user key to zebra BytesWritable key */
+ /* using sort key expr tree */
+ /* Returns a java base object */
+ /* Done for each user key */
+
+ bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey);
+ } catch (Exception e) {
+
+ }
+
+ context.write(bytesKey, tupleRow);
+ }
+
+ @Override
+ public void setup(Context context) {
+ bytesKey = new BytesWritable();
+ sortKey = context.getConfiguration().get("sortKey");
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(context);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ javaObj = BasicTableOutputFormat.getSortKeyGenerator(context);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (org.apache.hadoop.zebra.parser.ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ static class ReduceClass extends
+ Reducer<BytesWritable, Tuple, BytesWritable, Tuple> {
+ Tuple outRow;
+
+ public void reduce(BytesWritable key, Iterator<Tuple> values, Context context)
+ throws IOException, InterruptedException {
+ try {
+ for (; values.hasNext();) {
+ context.write(key, values.next());
+ }
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ static class OutputPartitionerClass extends ZebraOutputPartition {
+
+ @Override
+ public int getOutputPartition(BytesWritable key, Tuple value)
+ throws ExecException {
+
+ String reg = null;
+ try {
+ reg = (String) (value.get(0));
+ } catch (Exception e) {
+ //
+ }
+
+ if (reg.equals("us"))
+ return 0;
+ else
+ return 1;
+
+ }
+ }
+
+ public void runMR(String sortKey, Path... paths) throws ParseException,
+ IOException, Exception, org.apache.hadoop.zebra.parser.ParseException {
+
+ Job job = new Job();
+ job.setJobName("tableMRSample");
+ Configuration conf = job.getConfiguration();
+ conf.set("table.output.tfile.compression", "gz");
+ conf.set("sortKey", sortKey);
+ // input settings
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(TestTempDirRemoval.MapClass.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(ZebraTuple.class);
+ FileInputFormat.setInputPaths(job, inputPath);
+
+ // TODO:
+ //job.setNumMapTasks(1);
+
+ // output settings
+
+ job.setOutputFormatClass(BasicTableOutputFormat.class);
+
+ String schema = "word:string, count:int";
+ String storageHint = "[word];[count]";
+ BasicTableOutputFormat.setMultipleOutputs(job,
+ TestTempDirRemoval.OutputPartitionerClass.class, paths);
+ ZebraSchema zSchema = ZebraSchema.createZebraSchema(schema);
+ ZebraStorageHint zStorageHint = ZebraStorageHint
+ .createZebraStorageHint(storageHint);
+ ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortKey, null);
+ BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint,
+ zSortInfo);
+ job.setNumReduceTasks(1);
+ job.submit();
+ job.waitForCompletion( true );
+ for ( int i =0; i < paths.length; ++i) {
+ Path tmpPath = new Path(paths[i], "_temporary");
+ FileSystem fileSys = tmpPath.getFileSystem(conf);
+ if(!fileSys.exists(tmpPath)) {
+ throw new RuntimeException("Temp Dir sld exist before BTOF.close() " + tmpPath.toString());
+ }
+ }
+ BasicTableOutputFormat.close( job );
+ for ( int i =0; i < paths.length; ++i) {
+ Path tmpPath = new Path(paths[i], "_temporary");
+ FileSystem fileSys = tmpPath.getFileSystem(conf);
+ if(fileSys.exists(tmpPath)) {
+ throw new RuntimeException("Temp Dir sld not exist after BTOF.close()" + tmpPath.toString());
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws ParseException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ TestTempDirRemoval test = new TestTempDirRemoval();
+ TestTempDirRemoval.setUpOnce();
+
+ test.test4();
+ }
+}