You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/17 01:31:45 UTC

svn commit: r910786 - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/test/org/apache/hadoop/zebra/io/ src/test/org/apache/hadoop/zebra/mapreduce/

Author: yanz
Date: Wed Feb 17 00:31:44 2010
New Revision: 910786

URL: http://svn.apache.org/viewvc?rev=910786&view=rev
Log:
PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)

Added:
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java
Modified:
    hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Wed Feb 17 00:31:44 2010
@@ -54,6 +54,8 @@
 
   BUG FIXES
 
+    PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)
+
     PIG-1167: Hadoop file glob support (yanz)
 
     PIG-1153: Record split exception fix (yanz)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Wed Feb 17 00:31:44 2010
@@ -1225,6 +1225,10 @@
     boolean sorted;
     private boolean finished;
     Tuple[] cgTuples;
+    private Path actualOutputPath;
+    private Configuration writerConf;
+
+
 
     /**
      * Create a BasicTable writer. The semantics are as follows:
@@ -1262,6 +1266,8 @@
     public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
         String comparator, Configuration conf) throws IOException {
       try {
+      	actualOutputPath = path;
+    	writerConf = conf;    	  
         schemaFile =
             new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
                 comparator, conf);
@@ -1337,15 +1343,20 @@
      */
     public Writer(Path path, Configuration conf) throws IOException {
       try {
+      	actualOutputPath = path;
+    	writerConf = conf;    	  
         schemaFile = new SchemaFile(path, conf);
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
         sorted = schemaFile.isSorted();
         colGroups = new ColumnGroup.Writer[numCGs];
         cgTuples = new Tuple[numCGs];
+        Path tmpWorkPath = new Path(path, "_temporary");	
         for (int nx = 0; nx < numCGs; nx++) {
           colGroups[nx] =
-            new ColumnGroup.Writer(new Path(path, partition.getCGSchema(nx).getName()),
+            new ColumnGroup.Writer(
+            		new Path(path, partition.getCGSchema(nx).getName()),
+            		new Path(tmpWorkPath, partition.getCGSchema(nx).getName()),
                   conf);
           cgTuples[nx] = TypesUtils.createTuple(colGroups[nx].getSchema());
         }
@@ -1420,6 +1431,7 @@
      */
     @Override
     public void close() throws IOException {
+      cleanupTempDir();	
       if (closed) return;
       closed = true;
       if (!finished)
@@ -1449,6 +1461,23 @@
         }
       }
     }
+
+    /**
+     * Removes the temporary directory underneath
+     * $path/_temporary used to create intermediate data
+     * during recrd writing
+     */
+    
+    private void cleanupTempDir() throws IOException {
+    	FileSystem fileSys = actualOutputPath.getFileSystem(writerConf);
+        Path pathToRemove = new Path(actualOutputPath, "_temporary");
+        if (fileSys.exists(pathToRemove)) {
+            if(!fileSys.delete(pathToRemove, true)) {
+              LOG.error("Failed to delete the temporary output" + 
+                      " directory: " + pathToRemove.toString());            
+            }
+        }
+    }    
     
     /**
      * Get the schema of the table.

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=910786&r1=910785&r2=910786&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Wed Feb 17 00:31:44 2010
@@ -1280,6 +1280,7 @@
    */
   public static class Writer implements Closeable {
     Path path;
+    Path finalOutputPath;
     Configuration conf;
     FileSystem fs;
     CGSchema cgschema;
@@ -1350,6 +1351,7 @@
         throws IOException, ParseException {
       this.path = path;
       this.conf = conf;
+      this.finalOutputPath = path;
 
       fs = path.getFileSystem(conf);
 
@@ -1379,6 +1381,25 @@
     }
 
     /**
+     * Reopen an already created ColumnGroup for writing. It accepts
+     * a temporary path for column group where cginserter can write.
+     * RuntimeException will be thrown if the table is already closed, 
+     * or if createMetaBlock() is called by some other process.
+     */
+    public Writer(Path finalPath, Path workPath, Configuration conf) throws IOException,
+        ParseException {
+      this.path = workPath;
+      finalOutputPath = finalPath;
+      this.conf = conf;
+      fs = path.getFileSystem(conf);
+      checkPath(finalOutputPath, false);
+      checkPath(path, true);
+      checkMetaFile(finalOutputPath);
+      cgschema = CGSchema.load(fs, finalOutputPath);
+
+    }    
+    
+    /**
      * Reopen an already created ColumnGroup for writing. RuntimeException will
      * be thrown if the table is already closed, or if createMetaBlock() is
      * called by some other process.
@@ -1386,6 +1407,7 @@
     public Writer(Path path, Configuration conf) throws IOException,
         ParseException {
       this.path = path;
+      finalOutputPath = path;
       this.conf = conf;
       fs = path.getFileSystem(conf);
       checkPath(path, false);
@@ -1444,9 +1466,9 @@
 
     private void createIndex() throws IOException {
       MetaFile.Writer metaFile =
-          MetaFile.createWriter(makeMetaFilePath(path), conf);
+          MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
       if (cgschema.isSorted()) {
-        CGIndex index = buildIndex(fs, path, false, conf);
+        CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
         DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
         try {
           index.write(dos);
@@ -1629,7 +1651,9 @@
           out.close();
           out = null;
           // do renaming only if all the above is successful.
-          fs.rename(new Path(path, tmpName), new Path(path, name));
+//          fs.rename(new Path(path, tmpName), new Path(path, name));
+          fs.rename(new Path(path, tmpName), new Path(finalOutputPath, name));
+
 /*
           if(cgschema.getOwner() != null || cgschema.getGroup() != null) {
             fs.setOwner(new Path(path, name), cgschema.getOwner(), cgschema.getGroup());

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=910786&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Wed Feb 17 00:31:44 2010
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.ColumnGroup;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing ColumnGroup APIs called as if in MapReduce Jobs
+ */
+public class TestColumnGroupWithWorkPath {
+  static Configuration conf;
+  static Random random;
+  static Path rootPath;
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+    conf.set("io.compression.codec.lzo.class", "no");
+    random = new Random(System.nanoTime());
+    rootPath = new Path(System.getProperty("test.build.data",
+        "build/test/data/workdir3"));
+    fs = rootPath.getFileSystem(conf);
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  BytesWritable makeRandomKey(int max) {
+    return makeKey(random.nextInt(max));
+  }
+
+  static BytesWritable makeKey(int i) {
+    return new BytesWritable(String.format("key%09d", i).getBytes());
+  }
+
+  String makeString(String prefix, int max) {
+    return String.format("%s%09d", prefix, random.nextInt(max));
+  }
+
+  int createCG(int parts, int rows, String strSchema, Path path,
+      boolean properClose, boolean sorted, int[] emptyTFiles)
+      throws IOException, ParseException {
+    if (fs.exists(path)) {
+      ColumnGroup.drop(path, conf);
+    }
+
+    Set<Integer> emptyTFileSet = new HashSet<Integer>();
+    if (emptyTFiles != null) {
+      for (int i = 0; i < emptyTFiles.length; ++i) {
+        emptyTFileSet.add(emptyTFiles[i]);
+      }
+    }
+
+    ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, sorted, path.getName(),
+        "pig", "gz", "root", null, (short) Short.parseShort("755", 8), false, conf);
+
+    writer.finish();
+
+    int total = 0;
+    Schema schema = new Schema(strSchema);
+    String colNames[] = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    int[] permutation = new int[parts];
+    for (int i = 0; i < parts; ++i) {
+      permutation[i] = i;
+    }
+
+    for (int i = parts - 1; i > 0; --i) {
+      int targetIndex = random.nextInt(i + 1);
+      int tmp = permutation[i];
+      permutation[i] = permutation[targetIndex];
+      permutation[targetIndex] = tmp;
+    }
+
+    for (int i = 0; i < parts; ++i) {
+      Path workPath = new Path(path.getParent(), "_temporary");
+      writer = new ColumnGroup.Writer(path, workPath, conf);
+      TableInserter inserter = writer.getInserter(String.format("part-%06d",
+          permutation[i]), true);
+      if ((rows > 0) && !emptyTFileSet.contains(permutation[i])) {
+        int actualRows = random.nextInt(rows) + rows / 2;
+        for (int j = 0; j < actualRows; ++j, ++total) {
+          BytesWritable key;
+          if (!sorted) {
+            key = makeRandomKey(rows * 10);
+          } else {
+            key = makeKey(total);
+          }
+          TypesUtils.resetTuple(tuple);
+          for (int k = 0; k < tuple.size(); ++k) {
+            try {
+              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+            } catch (ExecException e) {
+              e.printStackTrace();
+            }
+          }
+          inserter.insert(key, tuple);
+        }
+      }
+      inserter.close();
+    }
+
+    if (properClose) {
+      writer = new ColumnGroup.Writer(path, conf);
+      writer.close();
+      /* We can only test number of rows on sorted tables.*/
+      if (sorted) {
+        BasicTableStatus status = getStatus(path);
+        Assert.assertEquals(total, status.getRows());
+      }
+    }
+
+    return total;
+  }
+
+  static class DupKeyGen {
+    int low, high;
+    int current;
+    boolean grow = true;
+    int index = 0;
+    int count = 0;
+
+    DupKeyGen(int low, int high) {
+      this.low = Math.max(10, low);
+      this.high = Math.max(this.low * 2, high);
+      current = this.low;
+    }
+
+    BytesWritable next() {
+      if (count == 0) {
+        count = nextCount();
+        ++index;
+      }
+      --count;
+      return makeKey(index);
+    }
+
+    int nextCount() {
+      int ret = current;
+      if ((grow && current > high) || (!grow && current < low)) {
+        grow = !grow;
+      }
+      if (grow) {
+        current *= 2;
+      } else {
+        current /= 2;
+      }
+      return ret;
+    }
+  }
+
+  int createCGDupKeys(int parts, int rows, String strSchema, Path path)
+      throws IOException, ParseException {
+    if (fs.exists(path)) {
+      ColumnGroup.drop(path, conf);
+    }
+
+    ColumnGroup.Writer writer = new ColumnGroup.Writer(path, strSchema, true, path.getName(),
+        "pig", "gz", "root", null, (short) Short.parseShort("777", 8), false, conf);
+    writer.finish();
+
+    int total = 0;
+    DupKeyGen keyGen = new DupKeyGen(10, rows * 3);
+    Schema schema = new Schema(strSchema);
+    String colNames[] = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    int[] permutation = new int[parts];
+    for (int i = 0; i < parts; ++i) {
+      permutation[i] = i;
+    }
+
+    for (int i = parts - 1; i > 0; --i) {
+      int targetIndex = random.nextInt(i + 1);
+      int tmp = permutation[i];
+      permutation[i] = permutation[targetIndex];
+      permutation[targetIndex] = tmp;
+    }
+
+    for (int i = 0; i < parts; ++i) {
+      writer = new ColumnGroup.Writer(path, conf);
+      TableInserter inserter = writer.getInserter(String.format("part-%06d",
+          permutation[i]), true);
+      if (rows > 0) {
+        int actualRows = random.nextInt(rows * 2 / 3) + rows * 2 / 3;
+        for (int j = 0; j < actualRows; ++j, ++total) {
+          BytesWritable key = keyGen.next();
+          TypesUtils.resetTuple(tuple);
+          for (int k = 0; k < tuple.size(); ++k) {
+            try {
+              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+            } catch (ExecException e) {
+              e.printStackTrace();
+            }
+          }
+          inserter.insert(key, tuple);
+        }
+      }
+      inserter.close();
+    }
+
+    writer = new ColumnGroup.Writer(path, conf);
+    writer.close();
+    BasicTableStatus status = getStatus(path);
+    Assert.assertEquals(total, status.getRows());
+
+    return total;
+  }
+
+  void rangeSplitCG(int numSplits, int totalRows, String strProjection,
+      Path path) throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+
+    List<CGRangeSplit> splits = reader.rangeSplit(numSplits);
+    reader.close();
+    int total = 0;
+    for (int i = 0; i < splits.size(); ++i) {
+      reader = new ColumnGroup.Reader(path, conf);
+      reader.setProjection(strProjection);
+      total += doReadOnly(reader.getScanner(splits.get(i), true));
+      totalBytes -= reader.getBlockDistribution(splits.get(i)).getLength();
+    }
+    Assert.assertEquals(total, totalRows);
+    Assert.assertEquals(totalBytes, 0L);
+  }
+
+  void doRangeSplit(int[] numSplits, int totalRows, String projection, Path path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        rangeSplitCG(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  void keySplitCG(int numSplits, int totalRows, String strProjection, Path path)
+      throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
+    Assert.assertEquals(totalBytes, keyDistri.length());
+    reader.close();
+    BytesWritable[] keys = null;
+    if (keyDistri.size() >= numSplits) {
+      keyDistri.resize(numSplits);
+      Assert.assertEquals(totalBytes, keyDistri.length());
+      RawComparable[] rawComparables = keyDistri.getKeys();
+      keys = new BytesWritable[rawComparables.length];
+      for (int i = 0; i < keys.length; ++i) {
+        keys[i] = new BytesWritable();
+        keys[i].setSize(rawComparables[i].size());
+        System.arraycopy(rawComparables[i].buffer(),
+            rawComparables[i].offset(), keys[i].get(), 0, rawComparables[i]
+                .size());
+      }
+    } else {
+      int targetSize = Math.min(totalRows / 10, numSplits);
+      // revert to manually cooked up keys.
+      Set<Integer> keySets = new TreeSet<Integer>();
+      while (keySets.size() < targetSize) {
+        keySets.add(random.nextInt(totalRows));
+      }
+      keys = new BytesWritable[targetSize];
+      if (!keySets.isEmpty()) {
+        int j = 0;
+        for (int i : keySets.toArray(new Integer[keySets.size()])) {
+          keys[j] = makeKey(i);
+          ++j;
+        }
+      }
+    }
+
+    int total = 0;
+    for (int i = 0; i < keys.length; ++i) {
+      reader = new ColumnGroup.Reader(path, conf);
+      reader.setProjection(strProjection);
+      BytesWritable begin = (i == 0) ? null : keys[i - 1];
+      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+      total += doReadOnly(reader.getScanner(begin, end, true));
+    }
+    Assert.assertEquals(total, totalRows);
+  }
+
+  void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        keySplitCG(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  BasicTableStatus getStatus(Path path) throws IOException, ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    try {
+      return reader.getStatus();
+    } finally {
+      reader.close();
+    }
+  }
+
+  void doReadWrite(Path path, int parts, int rows, String schema,
+      String projection, boolean properClose, boolean sorted, int[] emptyTFiles)
+      throws IOException, ParseException {
+    int totalRows = createCG(parts, rows, schema, path, properClose, sorted,
+        emptyTFiles);
+    if (rows == 0) {
+      Assert.assertEquals(rows, 0);
+    }
+
+    doRangeSplit(new int[] { 1, 2, parts / 2, parts, 2 * parts }, totalRows,
+        projection, path);
+    if (sorted) {
+      doKeySplit(new int[] { 1, 2, parts / 2, parts, 2 * parts, 10 * parts },
+          totalRows, projection, path);
+    }
+  }
+
+  int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+    int total = 0;
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    for (; !scanner.atEnd(); scanner.advance()) {
+      ++total;
+      switch (random.nextInt() % 4) {
+      case 0:
+        scanner.getKey(key);
+        break;
+      case 1:
+        scanner.getValue(value);
+        break;
+      case 2:
+        scanner.getKey(key);
+        scanner.getValue(value);
+        break;
+      default: // no-op.
+      }
+    }
+    scanner.close();
+
+    return total;
+  }
+
+  @Test
+  public void testNullSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupNullSplits");
+    int totalRows = createCG(2, 10, "a, b, c", path, true, true, null);
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    reader.setProjection("a,d,c,f");
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, null,
+        false)));
+    reader.close();
+  }
+
+  @Test
+  public void testNegativeSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestNegativeSplits");
+    int totalRows = createCG(2, 100, "a, b, c", path, true, true, null);
+    rangeSplitCG(-1, totalRows, "a,d,c,f", path);
+  }
+
+  @Test
+  public void testEmptyCG() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
+  }
+
+  @Test
+  public void testEmptyTFiles() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);    
+  }
+
+  public void testNormalCases() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupNormal");
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
+    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
+  }
+
+  @Test
+  public void testSomeEmptyTFiles() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
+		for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
+          emptyTFiles);
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
+          emptyTFiles);
+      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
+          emptyTFiles);    
+    }
+  }
+
+  int countRows(Path path, String projection) throws IOException,
+    ParseException {
+    ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
+    if (projection != null) {
+      reader.setProjection(projection);
+    }
+    int totalRows = 0;
+    TableScanner scanner = reader.getScanner(null, true);
+    for (; !scanner.atEnd(); scanner.advance()) {
+      ++totalRows;
+    }
+    scanner.close();
+    return totalRows;
+  }
+
+  @Test
+  public void testProjection() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupProjection");
+    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    Assert.assertEquals(totalRows, countRows(path, null));
+    Assert.assertEquals(totalRows, countRows(path, ""));
+  }
+
+  @Test
+  public void testDuplicateKeys() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestColumnGroupDuplicateKeys");
+    int totalRows = createCGDupKeys(2, 250, "a, b, c", path);
+    doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+        path);
+  }
+
+  @Test
+  public void testSortedCGKeySplit() throws IOException, ParseException {
+    conf.setInt("table.output.tfile.minBlock.size", 640 * 1024);
+    Path path = new Path(rootPath, "TestSortedCGKeySplit");
+    int totalRows = createCG(2, 250, "a, b, c", path, true, true, null);
+    doKeySplit(new int[] { 1, 5 }, totalRows, "a, d, c, f",
+        path);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java?rev=910786&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTempDirRemoval.java Wed Feb 17 00:31:44 2010
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ * 
+ */
+public class TestTempDirRemoval {
+
+  static String inputPath;
+  static String inputFileName = "multi-input.txt";
+  protected static ExecType execType = ExecType.LOCAL;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  // private static Path pathWorking, pathTable1, path2, path3,
+  // pathTable4, pathTable5;
+  private static Configuration conf;
+  public static String sortKey = null;
+
+  private static FileSystem fs;
+
+  private static String zebraJar;
+  private static String whichCluster;
+  private static String multiLocs;
+  private static String strTable1 = null;
+  private static String strTable2 = null;
+  private static String strTable3 = null;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    if (System.getenv("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (System.getProperty("whichCluster") == null) {
+      System.setProperty("whichCluster", "miniCluster");
+      System.out.println("should be called");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("clusterddddd: " + whichCluster);
+    System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+    System.out.println(" get env user name: " + System.getenv("USER"));
+    if ((whichCluster.equalsIgnoreCase("realCluster") && System
+        .getenv("HADOOP_HOME") == null)) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/lib");
+      System.exit(0);
+    }
+
+    // set inputPath and output path
+    String workingDir = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      inputPath = new String("/user/" + System.getenv("USER") + "/"
+          + inputFileName);
+      System.out.println("inputPath: " + inputPath);
+      multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+          + "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
+          + "/user/" + System.getenv("USER") + "/" + "japan");
+      fs = new Path(inputPath).getFileSystem(conf);
+
+    } else {
+      RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+      fs = new LocalFileSystem(rawLFS);
+      workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+      inputPath = new String(workingDir + "/" + inputFileName);
+      System.out.println("inputPath: " + inputPath);
+      multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
+          + "india" + "," + workingDir + "/" + "japan");
+    }
+    writeToFile(inputPath);
+    // check inputPath existence
+    File inputFile = new File(inputPath);
+    if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+      System.out.println("Please put inputFile in hdfs: " + inputPath);
+      // System.exit(0);
+    }
+    if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) {
+      System.out
+          .println("Please put inputFile under workingdir. working dir is : "
+              + workingDir);
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+  }
+
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    StringTokenizer st = new StringTokenizer(t, ".");
+    String methodName = null;
+    while (st.hasMoreTokens()) {
+      methodName = st.nextToken();
+    }
+    return methodName;
+  }
+
+  public Path generateOutPath(String currentMethod) {
+    Path outPath = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
+          + currentMethod);
+    } else {
+      String workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+      outPath = new Path(workingDir + "/multiOutput/" + currentMethod);
+      System.out.println("output file: " + outPath.toString());
+    }
+    return outPath;
+  }
+
+  public void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+          + outPath.toString();
+    } else {
+      StringTokenizer st = new StringTokenizer(outPath.toString(), ":");
+      int count = 0;
+      String file = null;
+      while (st.hasMoreElements()) {
+        count++;
+        String token = st.nextElement().toString();
+        if (count == 2)
+          file = token;
+      }
+      command = "rm -rf " + file;
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+
+  }
+
+  public static void writeToFile(String inputFile) throws IOException {
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      FileWriter fstream = new FileWriter(inputFile);
+      BufferedWriter out = new BufferedWriter(fstream);
+      out.write("us 2\n");
+      out.write("japan 2\n");
+      out.write("india 4\n");
+      out.write("us 2\n");
+      out.write("japan 1\n");
+      out.write("india 3\n");
+      out.write("nouse 5\n");
+      out.write("nowhere 4\n");
+      out.close();
+    }
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      FSDataOutputStream fout = fs.create(new Path(inputFile));
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 2\n");
+      fout.writeBytes("india 4\n");
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 1\n");
+      fout.writeBytes("india 3\n");
+      fout.writeBytes("nouse 5\n");
+      fout.writeBytes("nowhere 4\n");
+      fout.close();
+    }
+  }
+
+  public static void getTablePaths(String myMultiLocs) {
+    StringTokenizer st = new StringTokenizer(myMultiLocs, ",");
+
+    // get how many tokens inside st object
+    System.out.println("tokens count: " + st.countTokens());
+    int count = 0;
+
+    // iterate st object to get more tokens from it
+    while (st.hasMoreElements()) {
+      count++;
+      String token = st.nextElement().toString();
+      if (whichCluster.equalsIgnoreCase("miniCluster")) {
+        System.out.println("in mini, token: " + token);
+        // in mini, token:
+        // file:/homes/<uid>/grid/multipleoutput/pig-table/contrib/zebra/ustest3
+        if (count == 1)
+          strTable1 = token;
+        if (count == 2)
+          strTable2 = token;
+        if (count == 3)
+          strTable3 = token;
+      }
+      if (whichCluster.equalsIgnoreCase("realCluster")) {
+        System.out.println("in real, token: " + token);
+        // in real, token: /user/hadoopqa/ustest3
+        // note: no prefix file: in real cluster
+        if (count == 1)
+          strTable1 = token;
+        if (count == 2)
+          strTable2 = token;
+        if (count == 3)
+          strTable3 = token;
+      }
+
+    }
+  }
+
+  public static void checkTableExists(boolean expected, String strDir)
+      throws IOException {
+
+    File theDir = null;
+    boolean actual = false;
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      theDir = new File(strDir.split(":")[1]);
+      actual = theDir.exists();
+
+    }
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      theDir = new File(strDir.split(":")[0]);
+      actual = fs.exists(new Path(theDir.toString()));
+    }
+    System.out.println("the dir : " + theDir.toString());
+   
+    if (actual != expected) {
+      Assert.fail("dir exists or not is different from what expected.");
+    }
+  }
+
+  public static void checkTable(String myMultiLocs) throws IOException {
+    System.out.println("myMultiLocs:" + myMultiLocs);
+    System.out.println("sorgetTablePathst key:" + sortKey);
+
+    getTablePaths(myMultiLocs);
+    String query1 = null;
+    String query2 = null;
+
+    if (strTable1 != null) {
+
+      query1 = "records1 = LOAD '" + strTable1
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    }
+    if (strTable2 != null) {
+      query2 = "records2 = LOAD '" + strTable2
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    }
+
+    int count1 = 0;
+    int count2 = 0;
+
+    if (query1 != null) {
+      System.out.println(query1);
+      pigServer.registerQuery(query1);
+      Iterator<Tuple> it = pigServer.openIterator("records1");
+      while (it.hasNext()) {
+        count1++;
+        Tuple RowValue = it.next();
+        System.out.println(RowValue);
+        // test 1 us table
+        if (query1.contains("test1") || query1.contains("test2")
+            || query1.contains("test3")) {
+
+          if (count1 == 1) {
+            Assert.assertEquals("us", RowValue.get(0));
+            Assert.assertEquals(2, RowValue.get(1));
+          }
+          if (count1 == 2) {
+            Assert.assertEquals("us", RowValue.get(0));
+            Assert.assertEquals(2, RowValue.get(1));
+          }
+        } // test1, test2
+
+      }// while
+      if (query1.contains("test1") || query1.contains("test2")
+          || query1.contains("test3")) {
+        Assert.assertEquals(2, count1);
+      }
+    }// if query1 != null
+
+    if (query2 != null) {
+      pigServer.registerQuery(query2);
+      Iterator<Tuple> it = pigServer.openIterator("records2");
+
+      while (it.hasNext()) {
+        count2++;
+        Tuple RowValue = it.next();
+        System.out.println(RowValue);
+
+        // if test1 other table
+        if (query2.contains("test1")) {
+          if (count2 == 1) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(3, RowValue.get(1));
+          }
+          if (count2 == 2) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+          if (count2 == 3) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(1, RowValue.get(1));
+          }
+          if (count2 == 4) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(2, RowValue.get(1));
+          }
+
+          if (count1 == 5) {
+            Assert.assertEquals("nouse", RowValue.get(0));
+            Assert.assertEquals(5, RowValue.get(1));
+          }
+          if (count1 == 6) {
+            Assert.assertEquals("nowhere", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+        }// if test1
+        // if test2 other table
+        if (query2.contains("test2")) {
+          if (count2 == 1) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+          if (count2 == 2) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(3, RowValue.get(1));
+          }
+          if (count2 == 3) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(2, RowValue.get(1));
+          }
+          if (count2 == 4) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(1, RowValue.get(1));
+          }
+
+          if (count1 == 5) {
+            Assert.assertEquals("nouse", RowValue.get(0));
+            Assert.assertEquals(5, RowValue.get(1));
+          }
+          if (count1 == 6) {
+            Assert.assertEquals("nowhere", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+        }// if test2
+        // if test3 other table
+        if (query2.contains("test3")) {
+          if (count2 == 1) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(1, RowValue.get(1));
+          }
+          if (count2 == 2) {
+            Assert.assertEquals("japan", RowValue.get(0));
+            Assert.assertEquals(2, RowValue.get(1));
+          }
+          if (count2 == 3) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(3, RowValue.get(1));
+          }
+          if (count2 == 4) {
+            Assert.assertEquals("india", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+          if (count1 == 5) {
+            Assert.assertEquals("nowhere", RowValue.get(0));
+            Assert.assertEquals(4, RowValue.get(1));
+          }
+          if (count1 == 6) {
+            Assert.assertEquals("nouse", RowValue.get(0));
+            Assert.assertEquals(5, RowValue.get(1));
+          }
+
+        }// if test3
+
+      }// while
+      if (query2.contains("test1") || query2.contains("test2")
+          || query2.contains("test3")) {
+        Assert.assertEquals(6, count2);
+      }
+    }// if query2 != null
+
+  }
+
+
+  @Test
+  public void test4() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    /*
+     * path list have repeat element, for example atest1 dir has been already
+     * created. should throw IOExcepiton. complaining atest4/CG0/.meta already
+     * exists
+     */
+    System.out.println("******Start  testcase: " + getCurrentMethodName());
+    sortKey = "word,count";
+    System.out.println("hello sort on word and count");
+    String methodName = getCurrentMethodName();
+    String myMultiLocs = null;
+    List<Path> paths = new ArrayList<Path>(3);
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "a"
+          + methodName + "," + "/user/" + System.getenv("USER") + "/" + "b"
+          + methodName);
+
+      paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+          + "a" + methodName)));
+      paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+          + "b" + methodName)));
+    } else {
+      RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+      fs = new LocalFileSystem(rawLFS);
+      myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "a"
+          + methodName + "," + fs.getWorkingDirectory() + "/" + "b"
+          + methodName);
+      paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "a"
+          + methodName)));
+      paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "b"
+          + methodName)));
+    }
+    getTablePaths(myMultiLocs);
+    removeDir(new Path(strTable1));
+    removeDir(new Path(strTable2));
+    runMR(sortKey, paths.toArray(new Path[2]));
+
+  }
+
+  static class MapClass extends
+      Mapper<LongWritable, Text, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+    private Object javaObj;
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      // value should contain "word count"
+      String[] wdct = value.toString().split(" ");
+      if (wdct.length != 2) {
+        // LOG the error
+        return;
+      }
+
+      byte[] word = wdct[0].getBytes();
+      bytesKey.set(word, 0, word.length);
+      System.out.println("word: " + new String(word));
+      tupleRow.set(0, new String(word));
+      tupleRow.set(1, Integer.parseInt(wdct[1]));
+      System.out.println("count:  " + Integer.parseInt(wdct[1]));
+
+      // This key has to be created by user
+      /*
+       * Tuple userKey = new DefaultTuple(); userKey.append(new String(word));
+       * userKey.append(Integer.parseInt(wdct[1]));
+       */
+      System.out.println("in map, sortkey: " + sortKey);
+      Tuple userKey = new DefaultTuple();
+      if (sortKey.equalsIgnoreCase("word,count")) {
+        userKey.append(new String(word));
+        userKey.append(Integer.parseInt(wdct[1]));
+      }
+
+      if (sortKey.equalsIgnoreCase("count")) {
+        userKey.append(Integer.parseInt(wdct[1]));
+      }
+
+      if (sortKey.equalsIgnoreCase("word")) {
+        userKey.append(new String(word));
+      }
+
+      try {
+
+        /* New M/R Interface */
+        /* Converts user key to zebra BytesWritable key */
+        /* using sort key expr tree */
+        /* Returns a java base object */
+        /* Done for each user key */
+
+        bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey);
+      } catch (Exception e) {
+
+      }
+
+      context.write(bytesKey, tupleRow);
+    }
+
+    @Override
+    public void setup(Context context) {
+      bytesKey = new BytesWritable();
+      sortKey = context.getConfiguration().get("sortKey");
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(context);
+        tupleRow = TypesUtils.createTuple(outSchema);
+        javaObj = BasicTableOutputFormat.getSortKeyGenerator(context);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (org.apache.hadoop.zebra.parser.ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  static class ReduceClass extends
+      Reducer<BytesWritable, Tuple, BytesWritable, Tuple> {
+    Tuple outRow;
+
+    public void reduce(BytesWritable key, Iterator<Tuple> values, Context context)
+        throws IOException, InterruptedException {
+      try {
+        for (; values.hasNext();) {
+          context.write(key, values.next());
+        }
+      } catch (ExecException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+  static class OutputPartitionerClass extends ZebraOutputPartition {
+
+    @Override
+    public int getOutputPartition(BytesWritable key, Tuple value)
+        throws ExecException {
+
+      String reg = null;
+      try {
+        reg = (String) (value.get(0));
+      } catch (Exception e) {
+        //
+      }
+
+      if (reg.equals("us"))
+        return 0;
+      else
+        return 1;
+
+    }
+  }
+
+  public void runMR(String sortKey, Path... paths) throws ParseException,
+      IOException, Exception, org.apache.hadoop.zebra.parser.ParseException {
+
+    Job job = new Job();
+    job.setJobName("tableMRSample");
+    Configuration conf = job.getConfiguration();
+    conf.set("table.output.tfile.compression", "gz");
+    conf.set("sortKey", sortKey);
+    // input settings
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(TestTempDirRemoval.MapClass.class);
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(ZebraTuple.class);
+    FileInputFormat.setInputPaths(job, inputPath);
+
+    // TODO: 
+    //job.setNumMapTasks(1);
+
+    // output settings
+
+    job.setOutputFormatClass(BasicTableOutputFormat.class);
+    
+    String schema = "word:string, count:int";
+    String storageHint = "[word];[count]";
+    BasicTableOutputFormat.setMultipleOutputs(job,
+        TestTempDirRemoval.OutputPartitionerClass.class, paths);
+    ZebraSchema zSchema = ZebraSchema.createZebraSchema(schema);
+    ZebraStorageHint zStorageHint = ZebraStorageHint
+        .createZebraStorageHint(storageHint);
+    ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortKey, null);
+    BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint,
+        zSortInfo);
+    job.setNumReduceTasks(1);
+    job.submit();
+    job.waitForCompletion( true );
+    for ( int i =0; i < paths.length; ++i) {
+    	Path tmpPath = new Path(paths[i], "_temporary");
+    	FileSystem fileSys = tmpPath.getFileSystem(conf);
+    	if(!fileSys.exists(tmpPath)) {
+    		throw new RuntimeException("Temp Dir sld exist before BTOF.close() " + tmpPath.toString());
+    	}
+    }
+    BasicTableOutputFormat.close( job );
+    for ( int i =0; i < paths.length; ++i) {
+    	Path tmpPath = new Path(paths[i], "_temporary");
+    	FileSystem fileSys = tmpPath.getFileSystem(conf);
+    	if(fileSys.exists(tmpPath)) {
+    		throw new RuntimeException("Temp Dir sld not exist after BTOF.close()" + tmpPath.toString());
+    	}
+    }
+    
+  }
+
+  public static void main(String[] args) throws ParseException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    TestTempDirRemoval test = new TestTempDirRemoval();
+    TestTempDirRemoval.setUpOnce();
+
+    test.test4();
+  }
+}