You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/16 00:46:49 UTC

svn commit: r1625176 [2/9] - in /hive/branches/cbo: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ contrib/src/test/results/clientpositive/ data/conf/tez/ data/files/ itests/hive-unit/src/test/java/org/ap...

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Mon Sep 15 22:46:44 2014
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -43,8 +44,10 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -131,10 +134,18 @@ public class ReduceSinkOperator extends 
   // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
   protected transient Object[][] cachedKeys;
 
+  private StructField recIdField; // field to look for record identifier in
+  private StructField bucketField; // field to look for bucket in record identifier
+  private StructObjectInspector acidRowInspector; // row inspector used by acid options
+  private StructObjectInspector recIdInspector; // OI for the record identifier
+  private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
       List<ExprNodeDesc> keys = conf.getKeyCols();
+      LOG.debug("keys size is " + keys.size());
+      for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString());
       keyEval = new ExprNodeEvaluator[keys.size()];
       int i = 0;
       for (ExprNodeDesc e : keys) {
@@ -259,6 +270,20 @@ public class ReduceSinkOperator extends 
         // TODO: this is fishy - we init object inspectors based on first tag. We
         //       should either init for each tag, or if rowInspector doesn't really
         //       matter, then we can create this in ctor and get rid of firstRow.
+        if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+            conf.getWriteType() == AcidUtils.Operation.DELETE) {
+          assert rowInspector instanceof StructObjectInspector :
+              "Exptected rowInspector to be instance of StructObjectInspector but it is a " +
+                  rowInspector.getClass().getName();
+          acidRowInspector = (StructObjectInspector)rowInspector;
+          // The record identifier is always in the first column
+          recIdField = acidRowInspector.getAllStructFieldRefs().get(0);
+          recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+          // The bucket field is in the second position
+          bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+          bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+        }
+
         LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys());
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
@@ -283,6 +308,11 @@ public class ReduceSinkOperator extends 
       if (bucketEval != null) {
         buckNum = computeBucketNumber(row, conf.getNumBuckets());
         cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+      } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+          conf.getWriteType() == AcidUtils.Operation.DELETE) {
+        // In the non-partitioned case we still want to compute the bucket number for updates and
+        // deletes.
+        buckNum = computeBucketNumber(row, conf.getNumBuckets());
       }
 
       HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
@@ -339,9 +369,20 @@ public class ReduceSinkOperator extends 
 
   private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
     int buckNum = 0;
-    for (int i = 0; i < bucketEval.length; i++) {
-      Object o = bucketEval[i].evaluate(row);
-      buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+
+    if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+        conf.getWriteType() == AcidUtils.Operation.DELETE) {
+      // We don't need to evalute the hash code.  Instead read the bucket number directly from
+      // the row.  I don't need to evaluate any expressions as I know I am reading the ROW__ID
+      // column directly.
+      Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
+      buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+      LOG.debug("Acid choosing bucket number " + buckNum);
+    } else {
+      for (int i = 0; i < bucketEval.length; i++) {
+        Object o = bucketEval[i].evaluate(row);
+        buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+      }
     }
 
     if (buckNum < 0) {
@@ -385,14 +426,19 @@ public class ReduceSinkOperator extends 
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide better
-      // load balance. If the requirement is to have a single reducer, we should set
-      // the number of reducers to 1.
-      // Use a constant seed to make the code deterministic.
-      if (random == null) {
-        random = new Random(12345);
+      // If no partition cols and not doing an update or delete, just distribute the data uniformly
+      // to provide better load balance. If the requirement is to have a single reducer, we should
+      // set the number of reducers to 1. Use a constant seed to make the code deterministic.
+      // For acid operations make sure to send all records with the same key to the same
+      // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file.
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        if (random == null) {
+          random = new Random(12345);
+        }
+        keyHashCode = random.nextInt();
+      } else {
+        keyHashCode = 1;
       }
-      keyHashCode = random.nextInt();
     } else {
       for (int i = 0; i < partitionEval.length; i++) {
         Object o = partitionEval[i].evaluate(row);
@@ -400,6 +446,7 @@ public class ReduceSinkOperator extends 
             + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
       }
     }
+    LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum));
     return buckNum < 0  ? keyHashCode : keyHashCode * 31 + buckNum;
   }
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Sep 15 22:46:44 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
@@ -94,8 +94,8 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
     taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
     taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
-    taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
-        MergeTask.class));
+    taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
+        MergeFileTask.class));
     taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
         DependencyCollectionTask.class));
     taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Sep 15 22:46:44 2014
@@ -18,67 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.ExceptionListener;
-import java.beans.Expression;
-import java.beans.PersistenceDelegate;
-import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
 import org.antlr.runtime.CommonToken;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
@@ -122,9 +66,8 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
-import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -181,11 +124,66 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 /**
  * Utilities.
@@ -352,9 +350,8 @@ public final class Utilities {
         if(MAP_PLAN_NAME.equals(name)){
           if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
             gWork = deserializePlan(in, MapWork.class, conf);
-          } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
-              OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
-            gWork = deserializePlan(in, MergeWork.class, conf);
+          } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+            gWork = deserializePlan(in, MergeFileWork.class, conf);
           } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
           } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Mon Sep 15 22:46:44 2014
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Sep 15 22:46:44 2014
@@ -17,22 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.login.LoginException;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -54,6 +41,9 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -68,6 +58,7 @@ import org.apache.hadoop.hive.shims.Hado
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -113,9 +104,20 @@ import org.apache.tez.runtime.library.co
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -212,6 +214,16 @@ public class DagUtils {
     conf.set("mapred.mapper.class", ExecMapper.class.getName());
     conf.set("mapred.input.format.class", inpFormat);
 
+    if (mapWork instanceof MergeFileWork) {
+      MergeFileWork mfWork = (MergeFileWork) mapWork;
+      // This mapper class is used for serializaiton/deserializaiton of merge
+      // file work.
+      conf.set("mapred.mapper.class", MergeFileMapper.class.getName());
+      conf.set("mapred.input.format.class", mfWork.getInputformat());
+      conf.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+          FileOutputFormat.class);
+    }
+
     return conf;
   }
 
@@ -486,6 +498,21 @@ public class DagUtils {
       }
     }
 
+    if (mapWork instanceof MergeFileWork) {
+      Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
+      // prepare the tmp output directory. The output tmp directory should
+      // exist before jobClose (before renaming after job completion)
+      Path tempOutPath = Utilities.toTempPath(outputPath);
+      try {
+        if (!fs.exists(tempOutPath)) {
+          fs.mkdirs(tempOutPath);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Can't make path " + outputPath + " : " + e.getMessage());
+      }
+    }
+
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
 
@@ -515,9 +542,13 @@ public class DagUtils {
     }
 
     UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
-    map = Vertex.create(mapWork.getName(),
-        ProcessorDescriptor.create(MapTezProcessor.class.getName()).
-        setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+    String procClassName = MapTezProcessor.class.getName();
+    if (mapWork instanceof MergeFileWork) {
+      procClassName = MergeFileTezProcessor.class.getName();
+    }
+    map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
+        .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
@@ -784,7 +815,7 @@ public class DagUtils {
   }
 
   /**
-   * @param path - the path from which we try to determine the resource base name
+   * @param path - the string from which we try to determine the resource base name
    * @return the name of the resource from a given path string.
    */
   public String getResourceBaseName(Path path) {
@@ -831,7 +862,8 @@ public class DagUtils {
             conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
                 HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
         long sleepInterval = HiveConf.getTimeVar(
-            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
+            TimeUnit.MILLISECONDS);
         LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
             + sleepInterval);
         boolean found = false;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Mon Sep 15 22:46:44 2014
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
@@ -34,8 +29,12 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Process input from tez LogicalInput and write output
@@ -66,7 +65,7 @@ public abstract class RecordProcessor  {
   /**
    * Common initialization code for RecordProcessors
    * @param jconf
-   * @param processorContext the {@link TezProcessorContext}
+   * @param processorContext the {@link ProcessorContext}
    * @param mrReporter
    * @param inputs map of Input names to {@link LogicalInput}s
    * @param outputs map of Output names to {@link LogicalOutput}s

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Mon Sep 15 22:46:44 2014
@@ -17,12 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +34,11 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -51,13 +50,15 @@ public class TezProcessor extends Abstra
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
   protected boolean isMap = false;
 
-  RecordProcessor rproc = null;
+  protected RecordProcessor rproc = null;
 
-  private JobConf jobConf;
+  protected JobConf jobConf;
 
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
+  protected ProcessorContext processorContext;
+
   protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
   static {
@@ -121,9 +122,6 @@ public class TezProcessor extends Abstra
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
       throws Exception {
 
-    Throwable originalThrowable = null;
-
-    try{
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
@@ -142,14 +140,23 @@ public class TezProcessor extends Abstra
         rproc = new ReduceRecordProcessor();
       }
 
+      initializeAndRunProcessor(inputs, outputs);
+  }
+
+  protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs)
+      throws Exception {
+    Throwable originalThrowable = null;
+    try {
       TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
       // Start the actual Inputs. After MRInput initialization.
-      for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+      for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
         if (!cacheAccess.isInputCached(inputEntry.getKey())) {
           LOG.info("Input: " + inputEntry.getKey() + " is not cached");
           inputEntry.getValue().start();
         } else {
-          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+          LOG.info("Input: " + inputEntry.getKey() +
+              " is already cached. Skipping start");
         }
       }
 
@@ -170,7 +177,7 @@ public class TezProcessor extends Abstra
       }
 
       try {
-        if(rproc != null){
+        if (rproc != null) {
           rproc.close();
         }
       } catch (Throwable t) {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java Mon Sep 15 22:46:44 2014
@@ -48,6 +48,11 @@ public class ReadEntity extends Entity i
   // is marked as being read.  Defaults to true as that is the most common case.
   private boolean needsLock = true;
 
+  // When true indicates that this object is being read as part of an update or delete.  This is
+  // important because in that case we shouldn't acquire a lock for it or authorize the read.
+  // These will be handled by the output to the table instead.
+  private boolean isUpdateOrDelete = false;
+
   // For views, the entities can be nested - by default, entities are at the top level
   private final Set<ReadEntity> parents = new HashSet<ReadEntity>();
 
@@ -166,4 +171,12 @@ public class ReadEntity extends Entity i
   public List<String> getAccessedColumns() {
     return accessedColumns;
   }
+
+  public void setUpdateOrDelete(boolean isUpdateOrDelete) {
+    this.isUpdateOrDelete = isUpdateOrDelete;
+  }
+
+  public boolean isUpdateOrDelete() {
+    return isUpdateOrDelete;
+  }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Mon Sep 15 22:46:44 2014
@@ -148,6 +148,16 @@ public class WriteEntity extends Entity 
   }
 
   /**
+   * Only use this if you are very sure of what you are doing.  This is used by the
+   * {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer} to reset the types to
+   * update or delete after rewriting and reparsing the queries.
+   * @param type new operation type
+   */
+  public void setWriteType(WriteType type) {
+    writeType = type;
+  }
+
+  /**
    * Equals function.
    */
   @Override

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Sep 15 22:46:44 2014
@@ -26,14 +26,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -42,24 +40,40 @@ import java.util.regex.Pattern;
  * are used by the compactor and cleaner and thus must be format agnostic.
  */
 public class AcidUtils {
-  private AcidUtils() {
-    // NOT USED
-  }
-  private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
-
   public static final String BASE_PREFIX = "base_";
   public static final String DELTA_PREFIX = "delta_";
+  public static final PathFilter deltaFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(DELTA_PREFIX);
+    }
+  };
   public static final String BUCKET_PREFIX = "bucket_";
-
+  public static final PathFilter bucketFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(BUCKET_PREFIX);
+    }
+  };
   public static final String BUCKET_DIGITS = "%05d";
   public static final String DELTA_DIGITS = "%07d";
+  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+  public static final PathFilter originalBucketFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+    }
+  };
+
+  private AcidUtils() {
+    // NOT USED
+  }
+  private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
 
-  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
-  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
-
   public static final PathFilter hiddenFileFilter = new PathFilter(){
     public boolean accept(Path p){
       String name = p.getName();
@@ -67,13 +81,6 @@ public class AcidUtils {
     }
   };
 
-  public static final PathFilter bucketFileFilter = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().startsWith(BUCKET_PREFIX);
-    }
-  };
-
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**
@@ -149,7 +156,7 @@ public class AcidUtils {
           .minimumTransactionId(0)
           .maximumTransactionId(0)
           .bucket(bucket);
-    } else if (filename.startsWith(AcidUtils.BUCKET_PREFIX)) {
+    } else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
           Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
       result
@@ -372,7 +379,8 @@ public class AcidUtils {
     }
 
     final Path base = bestBase == null ? null : bestBase.getPath();
-    LOG.debug("base = " + base + " deltas = " + deltas.size());
+    LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
+        deltas.size());
 
     return new Directory(){
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java Mon Sep 15 22:46:44 2014
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-public class OrcFileStripeMergeInputFormat extends MergeInputFormat {
+import java.io.IOException;
+
+public class OrcFileStripeMergeInputFormat extends MergeFileInputFormat {
 
   @Override
   public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader(

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Mon Sep 15 22:46:44 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 /**
  * The interface for writing ORC files.
@@ -72,4 +73,30 @@ public interface Writer {
    * @return the offset that would be a valid end location for an ORC file
    */
   long writeIntermediateFooter() throws IOException;
+
+  /**
+   * Fast stripe append to ORC file. This interface is used for fast ORC file
+   * merge with other ORC files. When merging, the file to be merged should pass
+   * stripe in binary form along with stripe information and stripe statistics.
+   * After appending last stripe of a file, use appendUserMetadata() to append
+   * any user metadata.
+   * @param stripe - stripe as byte array
+   * @param offset - offset within byte array
+   * @param length - length of stripe within byte array
+   * @param stripeInfo - stripe information
+   * @param stripeStatistics - stripe statistics (Protobuf objects can be
+   *                         merged directly)
+   * @throws IOException
+   */
+  public void appendStripe(byte[] stripe, int offset, int length,
+      StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException;
+
+  /**
+   * When fast stripe append is used for merging ORC stripes, after appending
+   * the last stripe from a file, this interface must be used to merge any
+   * user metadata.
+   * @param userMetadata - user metadata
+   */
+  public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata);
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Sep 15 22:46:44 2014
@@ -29,6 +29,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -74,10 +78,17 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -2316,17 +2327,19 @@ class WriterImpl implements Writer, Memo
     return rawWriter.getPos();
   }
 
-  void appendStripe(byte[] stripe, StripeInformation stripeInfo,
-      OrcProto.StripeStatistics stripeStatistics) throws IOException {
-    appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
-  }
-
-  void appendStripe(byte[] stripe, int offset, int length,
+  @Override
+  public void appendStripe(byte[] stripe, int offset, int length,
       StripeInformation stripeInfo,
       OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    checkArgument(stripe != null, "Stripe must not be null");
+    checkArgument(length <= stripe.length,
+        "Specified length must not be greater specified array length");
+    checkArgument(stripeInfo != null, "Stripe information must not be null");
+    checkArgument(stripeStatistics != null,
+        "Stripe statistics must not be null");
+
     getStream();
     long start = rawWriter.getPos();
-
     long stripeLen = length;
     long availBlockSpace = blockSize - (start % blockSize);
 
@@ -2382,7 +2395,8 @@ class WriterImpl implements Writer, Memo
     }
   }
 
-  void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+  @Override
+  public void appendUserMetadata(List<UserMetadataItem> userMetadata) {
     if (userMetadata != null) {
       for (UserMetadataItem item : userMetadata) {
         this.userMetadata.put(item.getName(), item.getValue());

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java Mon Sep 15 22:46:44 2014
@@ -20,14 +20,14 @@ package org.apache.hadoop.hive.ql.io.rcf
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-public class RCFileBlockMergeInputFormat extends MergeInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeFileInputFormat {
 
   @Override
   public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Mon Sep 15 22:46:44 2014
@@ -53,11 +53,12 @@ public class DbTxnManager extends HiveTx
   }
 
   @Override
-  public void openTxn(String user) throws LockException {
+  public long openTxn(String user) throws LockException {
     init();
     try {
       txnId = client.openTxn(user);
       LOG.debug("Opened txn " + txnId);
+      return txnId;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -88,7 +89,11 @@ public class DbTxnManager extends HiveTx
 
     // For each source to read, get a shared lock
     for (ReadEntity input : plan.getInputs()) {
-      if (!input.needsLock()) continue;
+      if (!input.needsLock() || input.isUpdateOrDelete()) {
+        // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+        // locks instead.
+        continue;
+      }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       compBuilder.setShared();
 
@@ -297,6 +302,11 @@ public class DbTxnManager extends HiveTx
   }
 
   @Override
+  public boolean supportsAcid() {
+    return true;
+  }
+
+  @Override
   protected void destruct() {
     try {
       if (txnId > 0) rollbackTxn();

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Mon Sep 15 22:46:44 2014
@@ -48,8 +48,9 @@ class DummyTxnManager extends HiveTxnMan
   private HiveLockManager lockMgr;
 
   @Override
-  public void openTxn(String user) throws LockException {
+  public long openTxn(String user) throws LockException {
     // No-op
+    return 0L;
   }
 
   @Override
@@ -208,6 +209,11 @@ class DummyTxnManager extends HiveTxnMan
     return false;
   }
 
+  @Override
+  public boolean supportsAcid() {
+    return false;
+  }
+
 
   protected void destruct() {
     if (lockMgr != null) {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java Mon Sep 15 22:46:44 2014
@@ -32,9 +32,10 @@ public interface HiveTxnManager {
   /**
    * Open a new transaction.
    * @param user Hive user who is opening this transaction.
+   * @return The new transaction id
    * @throws LockException if a transaction is already open.
    */
-  void openTxn(String user) throws LockException;
+  long openTxn(String user) throws LockException;
 
   /**
    * Get the lock manager.  This must be used rather than instantiating an
@@ -120,4 +121,10 @@ public interface HiveTxnManager {
    * @return true if the new format should be used.
    */
   boolean useNewShowLocksFormat();
+
+  /**
+   * Indicate whether this transaction manager supports ACID operations
+   * @return true if this transaction manager does ACID
+   */
+  boolean supportsAcid();
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Sep 15 22:46:44 2014
@@ -96,6 +96,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -1227,7 +1228,7 @@ public class Hive {
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal) throws HiveException {
+      boolean isSrcLocal, boolean isAcid) throws HiveException {
     Table tbl = getTable(tableName);
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
@@ -1275,7 +1276,7 @@ public class Hive {
             isSrcLocal);
       } else {
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal);
+        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
       }
 
       // recreate the partition if it existed before
@@ -1407,7 +1408,7 @@ private void constructOneLBLocationMap(F
    */
   public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
-      int numDP, boolean holdDDLTime, boolean listBucketingEnabled)
+      int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
@@ -1463,7 +1464,7 @@ private void constructOneLBLocationMap(F
 
         // finally load the partition -- move the file to the final table address
         loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
-            listBucketingEnabled, false);
+            listBucketingEnabled, false, isAcid);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
       return fullPartSpecs;
@@ -1489,14 +1490,16 @@ private void constructOneLBLocationMap(F
    *          If the source directory is LOCAL
    * @param isSkewedStoreAsSubdir
    *          if list bucketing enabled
+   * @param isAcid true if this is an ACID based write
    */
   public void loadTable(Path loadPath, String tableName, boolean replace,
-      boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException {
+      boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
+      throws HiveException {
     Table tbl = getTable(tableName);
     if (replace) {
       tbl.replaceFiles(loadPath, isSrcLocal);
     } else {
-      tbl.copyFiles(loadPath, isSrcLocal);
+      tbl.copyFiles(loadPath, isSrcLocal, isAcid);
     }
 
     try {
@@ -2313,8 +2316,19 @@ private void constructOneLBLocationMap(F
     return success;
   }
 
+  /**
+   * Copy files.  This handles building the mapping for buckets and such between the source and
+   * destination
+   * @param conf Configuration object
+   * @param srcf source directory, if bucketed should contain bucket files
+   * @param destf directory to move files into
+   * @param fs Filesystem
+   * @param isSrcLocal true if source is on local file system
+   * @param isAcid true if this is an ACID based write
+   * @throws HiveException
+   */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
-      FileSystem fs, boolean isSrcLocal) throws HiveException {
+      FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException {
     boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
@@ -2342,23 +2356,105 @@ private void constructOneLBLocationMap(F
       return;
       // srcs = new FileStatus[0]; Why is this needed?
     }
+
+    // If we're moving files around for an ACID write then the rules and paths are all different.
+    // You can blame this on Owen.
+    if (isAcid) {
+      moveAcidFiles(srcFs, srcs, destf);
+    } else {
     // check that source and target paths exist
-    List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
-    // move it, move it
-    try {
-      for (List<Path[]> sdpairs : result) {
-        for (Path[] sdpair : sdpairs) {
-          if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
-            throw new IOException("Cannot move " + sdpair[0] + " to "
-                + sdpair[1]);
+      List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
+      // move it, move it
+      try {
+        for (List<Path[]> sdpairs : result) {
+          for (Path[] sdpair : sdpairs) {
+            if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
+              throw new IOException("Cannot move " + sdpair[0] + " to "
+                  + sdpair[1]);
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException("copyFiles: error while moving files!!!", e);
+      }
+    }
+  }
+
+  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst)
+      throws HiveException {
+    // The layout for ACID files is table|partname/base|delta/bucket
+    // We will always only be writing delta files.  In the buckets created by FileSinkOperator
+    // it will look like bucket/delta/bucket.  So we need to move that into the above structure.
+    // For the first mover there will be no delta directory, so we can move the whole directory.
+    // For everyone else we will need to just move the buckets under the existing delta
+    // directory.
+
+    Set<Path> createdDeltaDirs = new HashSet<Path>();
+    // Open the original path we've been given and find the list of original buckets
+    for (FileStatus stat : stats) {
+      Path srcPath = stat.getPath();
+
+      LOG.debug("Acid move Looking for original buckets in " + srcPath);
+
+      FileStatus[] origBucketStats = null;
+      try {
+        origBucketStats = fs.listStatus(srcPath, AcidUtils.originalBucketFilter);
+      } catch (IOException e) {
+        String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString();
+        LOG.error(msg);
+        throw new HiveException(msg, e);
+      }
+      LOG.debug("Acid move found " + origBucketStats.length + " original buckets");
+
+      for (FileStatus origBucketStat : origBucketStats) {
+        Path origBucketPath = origBucketStat.getPath();
+        LOG.debug("Acid move looking for delta files in bucket " + origBucketPath);
+
+        FileStatus[] deltaStats = null;
+        try {
+          deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter);
+        } catch (IOException e) {
+          throw new HiveException("Unable to look for delta files in original bucket " +
+              origBucketPath.toUri().toString(), e);
+        }
+        LOG.debug("Acid move found " + deltaStats.length + " delta files");
+
+        for (FileStatus deltaStat : deltaStats) {
+          Path deltaPath = deltaStat.getPath();
+          // Create the delta directory.  Don't worry if it already exists,
+          // as that likely means another task got to it first.  Then move each of the buckets.
+          // it would be more efficient to try to move the delta with it's buckets but that is
+          // harder to make race condition proof.
+          Path deltaDest = new Path(dst, deltaPath.getName());
+          try {
+            if (!createdDeltaDirs.contains(deltaDest)) {
+              try {
+                fs.mkdirs(deltaDest);
+                createdDeltaDirs.add(deltaDest);
+              } catch (IOException swallowIt) {
+                // Don't worry about this, as it likely just means it's already been created.
+                LOG.info("Unable to create delta directory " + deltaDest +
+                    ", assuming it already exists: " + swallowIt.getMessage());
+              }
+            }
+            FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+            LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+            for (FileStatus bucketStat : bucketStats) {
+              Path bucketSrc = bucketStat.getPath();
+              Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+              LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+                  bucketDest.toUri().toString());
+              fs.rename(bucketSrc, bucketDest);
+            }
+          } catch (IOException e) {
+            throw new HiveException("Error moving acid files", e);
           }
         }
       }
-    } catch (IOException e) {
-      throw new HiveException("copyFiles: error while moving files!!!", e);
     }
   }
 
+
   /**
    * Replaces files in the partition with new data set specified by srcf. Works
    * by renaming directory of srcf to the destination file.

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Mon Sep 15 22:46:44 2014
@@ -659,12 +659,14 @@ public class Table implements Serializab
    *          Files to be moved. Leaf directories or globbed file paths
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param isAcid
+   *          True if this is an ACID based insert, update, or delete
    */
-  protected void copyFiles(Path srcf, boolean isSrcLocal) throws HiveException {
+  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException {
     FileSystem fs;
     try {
       fs = getDataLocation().getFileSystem(Hive.get().getConf());
-      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal);
+      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid);
     } catch (IOException e) {
       throw new HiveException("addFiles: filesystem error in check phase", e);
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Mon Sep 15 22:46:44 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -371,6 +372,12 @@ public class BucketingSortingReduceSinkO
         return null;
       }
 
+      // Don't do this optimization with updates or deletes
+      if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
+          pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+        return null;
+      }
+
       // Support for dynamic partitions can be added later
       if (fsOp.getConf().getDynPartCtx() != null) {
         return null;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Sep 15 22:46:44 2014
@@ -18,20 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -53,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -65,8 +53,10 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -88,6 +78,7 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
@@ -96,8 +87,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -106,6 +99,22 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 
 /**
  * General utility common functions for the Processor to convert operator into
@@ -1250,33 +1259,20 @@ public final class GenMapRedUtils {
         (conf.getBoolVar(ConfVars.HIVEMERGEORCFILESTRIPELEVEL) &&
             fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class))) {
 
-      // Check if InputFormatClass is valid
-      final String inputFormatClass;
-      if (fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
-        inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+      cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
+          dpCtx != null && dpCtx.getNumDPCols() > 0);
+      if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
+        cplan.setName("Tez Merge File Work");
+        ((TezWork) work).add(cplan);
       } else {
-        inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL);
-      }
-      try {
-        Class c = Class.forName(inputFormatClass);
-
-        if(fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)) {
-          LOG.info("OrcFile format - Using stripe level merge");
-        } else {
-          LOG.info("RCFile format- Using block level merge");
-        }
-        cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
-            dpCtx != null && dpCtx.getNumDPCols() > 0);
         work = cplan;
-      } catch (ClassNotFoundException e) {
-        String msg = "Illegal input format class: " + inputFormatClass;
-        throw new SemanticException(msg);
       }
     } else {
       cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
       if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
         work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
-        cplan.setName("Merge");
+        cplan.setName("Tez Merge File Work");
         ((TezWork)work).add(cplan);
       } else {
         work = new MapredWork();
@@ -1489,6 +1485,7 @@ public final class GenMapRedUtils {
    *
    * @param fsInputDesc
    * @param finalName
+   * @param inputFormatClass 
    * @return MergeWork if table is stored as RCFile or ORCFile,
    *         null otherwise
    */
@@ -1498,38 +1495,62 @@ public final class GenMapRedUtils {
     Path inputDir = fsInputDesc.getFinalDirName();
     TableDesc tblDesc = fsInputDesc.getTableInfo();
 
-    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class) ||
-        tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
-      ArrayList<Path> inputDirs = new ArrayList<Path>(1);
-      ArrayList<String> inputDirstr = new ArrayList<String>(1);
-      if (!hasDynamicPartitions
-          && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
-        inputDirs.add(inputDir);
-        inputDirstr.add(inputDir.toString());
-      }
-
-      MergeWork work = new MergeWork(inputDirs, finalName,
-          hasDynamicPartitions, fsInputDesc.getDynPartCtx(),
-          tblDesc.getInputFileFormatClass());
-      LinkedHashMap<String, ArrayList<String>> pathToAliases =
-          new LinkedHashMap<String, ArrayList<String>>();
-      pathToAliases.put(inputDir.toString(), (ArrayList<String>) inputDirstr.clone());
-      work.setMapperCannotSpanPartns(true);
-      work.setPathToAliases(pathToAliases);
-      work.setAliasToWork(
-          new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
-      if (hasDynamicPartitions
-          || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
-        work.getPathToPartitionInfo().put(inputDir.toString(),
-            new PartitionDesc(tblDesc, null));
-      }
-      work.setListBucketingCtx(fsInputDesc.getLbCtx());
+    List<Path> inputDirs = new ArrayList<Path>(1);
+    ArrayList<String> inputDirstr = new ArrayList<String>(1);
+    // this will be populated by MergeFileWork.resolveDynamicPartitionStoredAsSubDirsMerge
+    // in case of dynamic partitioning and list bucketing
+    if (!hasDynamicPartitions &&
+        !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+      inputDirs.add(inputDir);
+    }
+    inputDirstr.add(inputDir.toString());
+
+    // internal input format class for CombineHiveInputFormat
+    final Class<? extends InputFormat> internalIFClass;
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      internalIFClass = RCFileBlockMergeInputFormat.class;
+    } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
+      internalIFClass = OrcFileStripeMergeInputFormat.class;
+    } else {
+      throw new SemanticException("createMergeTask called on a table with file"
+          + " format other than RCFile or ORCFile");
+    }
 
-      return work;
+    // create the merge file work
+    MergeFileWork work = new MergeFileWork(inputDirs, finalName,
+        hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
+    LinkedHashMap<String, ArrayList<String>> pathToAliases =
+        new LinkedHashMap<String, ArrayList<String>>();
+    pathToAliases.put(inputDir.toString(), inputDirstr);
+    work.setMapperCannotSpanPartns(true);
+    work.setPathToAliases(pathToAliases);
+    PartitionDesc pDesc = new PartitionDesc(tblDesc, null);
+    pDesc.setInputFileFormatClass(internalIFClass);
+    work.getPathToPartitionInfo().put(inputDir.toString(), pDesc);
+    work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+    // create alias to work which contains the merge operator
+    LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
+        new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+    Operator<? extends OperatorDesc> mergeOp = null;
+    final FileMergeDesc fmd;
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      fmd = new RCFileMergeDesc();
+    } else {
+      fmd = new OrcFileMergeDesc();
     }
+    fmd.setDpCtx(fsInputDesc.getDynPartCtx());
+    fmd.setOutputPath(finalName);
+    fmd.setHasDynamicPartitions(work.hasDynamicPartitions());
+    fmd.setListBucketingAlterTableConcatenate(work.isListBucketingAlterTableConcatenate());
+    int lbLevel = work.getListBucketingCtx() == null ? 0 :
+      work.getListBucketingCtx().calculateListBucketingLevel();
+    fmd.setListBucketingDepth(lbLevel);
+    mergeOp = OperatorFactory.get(fmd);
+    aliasToWork.put(inputDir.toString(), mergeOp);
+    work.setAliasToWork(aliasToWork);
 
-    throw new SemanticException("createMergeTask called on a table with file"
-        + " format other than RCFile or ORCFile");
+    return work;
   }
 
   /**

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Sep 15 22:46:44 2014
@@ -118,7 +118,8 @@ public class Optimizer {
     }
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
-        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
+        !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       transformations.add(new CorrelationOptimizer());
     }
     if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Mon Sep 15 22:46:44 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -173,8 +175,22 @@ public class SortedDynPartitionOptimizer
           destTable.getCols());
       ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = getSortPositionsOrder(
           destTable.getSortCols(), destTable.getCols());
-      List<Integer> sortPositions = sortOrderPositions.getFirst();
-      List<Integer> sortOrder = sortOrderPositions.getSecond();
+      List<Integer> sortPositions = null;
+      List<Integer> sortOrder = null;
+      if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+          fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
+        // When doing updates and deletes we always want to sort on the rowid because the ACID
+        // reader will expect this sort order when doing reads.  So
+        // ignore whatever comes from the table and enforce this sort order instead.
+        sortPositions = Arrays.asList(0);
+        sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if
+      } else {
+        sortPositions = sortOrderPositions.getFirst();
+        sortOrder = sortOrderPositions.getSecond();
+      }
+      LOG.debug("Got sort order");
+      for (int i : sortPositions) LOG.debug("sort position " + i);
+      for (int i : sortOrder) LOG.debug("sort order " + i);
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
       List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
           .getColumnInfos();
@@ -198,7 +214,7 @@ public class SortedDynPartitionOptimizer
         colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
       }
       ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
-          newValueCols, bucketColumns, numBuckets, fsParent);
+          newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
 
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
@@ -319,7 +335,7 @@ public class SortedDynPartitionOptimizer
     public ReduceSinkDesc getReduceSinkDesc(List<Integer> partitionPositions,
         List<Integer> sortPositions, List<Integer> sortOrder, ArrayList<ExprNodeDesc> newValueCols,
         ArrayList<ExprNodeDesc> bucketColumns, int numBuckets,
-        Operator<? extends OperatorDesc> parent) {
+        Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
 
       // Order of KEY columns
       // 1) Partition columns
@@ -409,7 +425,7 @@ public class SortedDynPartitionOptimizer
       // Number of reducers is set to default (-1)
       ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, newKeyCols.size(), newValueCols,
           outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
-          valueTable);
+          valueTable, writeType);
       rsConf.setBucketCols(bucketColumns);
       rsConf.setNumBuckets(numBuckets);
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Mon Sep 15 22:46:44 2014
@@ -31,6 +31,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.antlr.runtime.tree.CommonTree;
 import org.antlr.runtime.tree.Tree;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -89,6 +91,13 @@ public abstract class BaseSemanticAnalyz
   protected HashMap<String, String> idToTableNameMap;
   protected QueryProperties queryProperties;
 
+  /**
+   * A set of FileSinkOperators being written to in an ACID compliant way.  We need to remember
+   * them here because when we build them we don't yet know the transaction id.  We need to go
+   * back and set it once we actually start running the query.
+   */
+  protected Set<FileSinkDesc> acidFileSinks = new HashSet<FileSinkDesc>();
+
   public static int HIVE_COLUMN_ORDER_ASC = 1;
   public static int HIVE_COLUMN_ORDER_DESC = 0;
 
@@ -943,6 +952,10 @@ public abstract class BaseSemanticAnalyz
     return queryProperties;
   }
 
+  public Set<FileSinkDesc> getAcidFileSinks() {
+    return acidFileSinks;
+  }
+
   /**
    * Construct list bucketing context.
    *

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Mon Sep 15 22:46:44 2014
@@ -28,6 +28,7 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -70,7 +71,7 @@ public class ExportSemanticAnalyzer exte
                     "Target is not a directory : " + toURI));
         } else {
           FileStatus[] files = fs.listStatus(toPath);
-          if (files != null) {
+          if (files != null && files.length != 0) {
             throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
                           "Target is not an empty directory : " + toURI));
           }
@@ -120,6 +121,7 @@ public class ExportSemanticAnalyzer exte
       rootTasks.add(rTask);
       inputs.add(new ReadEntity(ts.tableHandle));
     }
-    outputs.add(new WriteEntity(parentPath, toURI.getScheme().equals("hdfs")));
+    boolean isLocal = FileUtils.isLocalFile(conf, toURI);
+    outputs.add(new WriteEntity(parentPath, isLocal));
   }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java Mon Sep 15 22:46:44 2014
@@ -18,11 +18,24 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -35,21 +48,22 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
 /**
  * ImportSemanticAnalyzer.
  *
@@ -82,6 +96,8 @@ public class ImportSemanticAnalyzer exte
       List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
       Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(),
           fromURI.getPath());
+      boolean isLocal = FileUtils.isLocalFile(conf, fromURI);
+      inputs.add(new ReadEntity(fromPath, isLocal));
       try {
         Path metadataPath = new Path(fromPath, METADATA_NAME);
         Map.Entry<org.apache.hadoop.hive.metastore.api.Table,
@@ -475,7 +491,7 @@ public class ImportSemanticAnalyzer exte
       String importedSerdeFormat = tableDesc.getSerdeProps().get(
           serdeConstants.SERIALIZATION_FORMAT);
       /*
-       * If Imported SerdeFormat is null, then set it to "1" just as 
+       * If Imported SerdeFormat is null, then set it to "1" just as
        * metadata.Table.getEmptyTable
        */
       importedSerdeFormat = importedSerdeFormat == null ? "1" : importedSerdeFormat;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1625176&r1=1625175&r2=1625176&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Mon Sep 15 22:46:44 2014
@@ -172,6 +172,15 @@ public class QBParseInfo {
     return insertIntoTables.contains(fullName.toLowerCase());
   }
 
+  /**
+   * Check if a table is in the list to be inserted into
+   * @param fullTableName table name in dbname.tablename format
+   * @return
+   */
+  public boolean isInsertIntoTable(String fullTableName) {
+    return insertIntoTables.contains(fullTableName.toLowerCase());
+  }
+
   public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) {
     return destToAggregationExprs.get(clause);
   }