You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/10/13 00:15:57 UTC

[18/50] [abbrv] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --cc common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ff8a7aa,e4b09a2..21c4cc9
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@@ -1864,14 -1855,9 +1857,16 @@@ public class HiveConf extends Configura
          " of the lock manager is dumped to log file.  This is for debugging.  See also " +
          "hive.lock.numretries and hive.lock.sleep.between.retries."),
  
-     HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
+     HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
 -        "This is intended to be used as an internal property for future versions of ACID. (See\n" +
 -        "HIVE-14035 for details.)"),
 +        "Sets the operational properties that control the appropriate behavior for various\n"
 +        + "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
 +        + "for future versions of ACID. (See HIVE-14035 for details.)\n"
 +        + "0: Turn on the legacy mode for ACID\n"
 +        + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n"
 +        + "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n"
-         + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing."),
++        + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n"
++        + "This is intended to be used as an internal property for future versions of ACID. (See\n" +
++          "HIVE-14035 for details.)"),
  
      HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
          "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 913e333,5812a1b..ab5f7b7
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@@ -192,13 -194,14 +197,13 @@@ public class HiveMetaStore extends Thri
      };
    };
  
 -  /**
 -   * default port on which to start the Hive server
 -   */
    public static final String ADMIN = "admin";
    public static final String PUBLIC = "public";
 +  /** MM write states. */
 +  public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a';
  
    private static HadoopThriftAuthBridge.Server saslServer;
-   private static HiveDelegationTokenManager delegationTokenManager;
+   private static MetastoreDelegationTokenManager delegationTokenManager;
    private static boolean useSasl;
  
    public static final String NO_FILTER_STRING = "";
@@@ -437,41 -495,14 +497,15 @@@
          }
        }
  
-       //Start Metrics for Embedded mode
+       //Start Metrics
        if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
-         try {
-           MetricsFactory.init(hiveConf);
-         } catch (Exception e) {
-           // log exception, but ignore inability to start
-           LOG.error("error in Metrics init: " + e.getClass().getName() + " "
-               + e.getMessage(), e);
-         }
-       }
- 
-       Metrics metrics = MetricsFactory.getInstance();
-       if (metrics != null && hiveConf.getBoolVar(ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED)) {
          LOG.info("Begin calculating metadata count metrics.");
+         Metrics.initialize(hiveConf);
+         databaseCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_DATABASES);
+         tableCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_TABLES);
+         partCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_PARTITIONS);
          updateMetrics();
-         LOG.info("Finished metadata count metrics: " + initDatabaseCount + " databases, " + initTableCount +
-           " tables, " + initPartCount + " partitions.");
-         metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable<Object>() {
-           @Override
-           public Object getValue() {
-             return initDatabaseCount;
-           }
-         });
-         metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable<Object>() {
-           @Override
-           public Object getValue() {
-             return initTableCount;
-           }
-         });
-         metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable<Object>() {
-           @Override
-           public Object getValue() {
-             return initPartCount;
-           }
-         });
++
        }
  
        preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 8e89477,70451c4..37da2f8
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@@ -59,12 -60,11 +60,12 @@@ import org.apache.hadoop.hive.conf.Hive
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.conf.HiveConfUtil;
  import org.apache.hadoop.hive.metastore.api.*;
 +import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
  import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
  import org.apache.hadoop.hive.metastore.txn.TxnUtils;
- import org.apache.hadoop.hive.shims.ShimLoader;
  import org.apache.hadoop.hive.shims.Utils;
- import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
  import org.apache.hadoop.security.UserGroupInformation;
  import org.apache.hadoop.util.StringUtils;
  import org.apache.thrift.TApplicationException;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index e7bbd62,bbe13fd..49005b9
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@@ -1981,59 -1967,156 +1970,212 @@@ public class MetaStoreUtils 
      return cols;
    }
  
 +  // TODO The following two utility methods can be moved to AcidUtils once no class in metastore is relying on them,
 +  // right now ObjectStore.getAllMmTablesForCleanup is calling these method
 +  /**
 +   * Checks if a table is an ACID table that only supports INSERT, but not UPDATE/DELETE
 +   * @param params table properties
 +   * @return true if table is an INSERT_ONLY table, false otherwise
 +   */
 +  // TODO# also check that transactional is true
 +  public static boolean isInsertOnlyTable(Map<String, String> params) {
 +    return isInsertOnlyTable(params, false);
 +  }
 +
 +  public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
 +    String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
 +    return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
 +  }
 +
 +   public static boolean isInsertOnlyTable(Properties params) {
 +    // TODO#  redirect for now - fix before merge
 +    HashMap<String, String> testMap = new HashMap<String, String>();
 +    for (String n  : params.stringPropertyNames()) {
 +      testMap.put(n, params.getProperty(n));
 +    }
 +    return isInsertOnlyTable(testMap);
 +  }
 +
 +   /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
 +  public static Boolean isToInsertOnlyTable(Map<String, String> props) {
 +    // TODO# Setting these separately is a very hairy issue in certain combinations, since we
 +    //       cannot decide what type of table this becomes without taking both into account, and
 +    //       in many cases the conversion might be illegal.
 +    //       The only thing we allow is tx = true w/o tx-props, for backward compat.
 +    String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
 +    String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
 +    if (transactional == null && transactionalProp == null) return null; // Not affected.
 +    boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
 +    if (transactionalProp == null) {
 +      if (isSetToTxn) return false; // Assume the full ACID table.
 +      throw new RuntimeException("Cannot change '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL
 +          + "' without '" + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "'");
 +    }
 +    if (!"insert_only".equalsIgnoreCase(transactionalProp)) return false; // Not MM.
 +    if (!isSetToTxn) {
 +      throw new RuntimeException("Cannot set '"
 +          + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "' to 'insert_only' without "
 +          + "setting '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "' to 'true'");
 +    }
 +    return true;
 +  }
 +
 +  public static boolean isRemovedInsertOnlyTable(Set<String> removedSet) {
 +    boolean hasTxn = removedSet.contains(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL),
 +        hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
 +    return hasTxn || hasProps;
 +  }
++
+   // given a list of partStats, this function will give you an aggr stats
+   public static List<ColumnStatisticsObj> aggrPartitionStats(List<ColumnStatistics> partStats,
+       String dbName, String tableName, List<String> partNames, List<String> colNames,
+       boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+       throws MetaException {
+     // 1. group by the stats by colNames
+     // map the colName to List<ColumnStatistics>
+     Map<String, List<ColumnStatistics>> map = new HashMap<>();
+     for (ColumnStatistics css : partStats) {
+       List<ColumnStatisticsObj> objs = css.getStatsObj();
+       for (ColumnStatisticsObj obj : objs) {
+         List<ColumnStatisticsObj> singleObj = new ArrayList<>();
+         singleObj.add(obj);
+         ColumnStatistics singleCS = new ColumnStatistics(css.getStatsDesc(), singleObj);
+         if (!map.containsKey(obj.getColName())) {
+           map.put(obj.getColName(), new ArrayList<ColumnStatistics>());
+         }
+         map.get(obj.getColName()).add(singleCS);
+       }
+     }
+     return aggrPartitionStats(map,dbName,tableName,partNames,colNames,useDensityFunctionForNDVEstimation, ndvTuner);
+   }
+ 
+   public static List<ColumnStatisticsObj> aggrPartitionStats(
+       Map<String, List<ColumnStatistics>> map, String dbName, String tableName,
+       final List<String> partNames, List<String> colNames,
+       final boolean useDensityFunctionForNDVEstimation,final double ndvTuner) throws MetaException {
+     List<ColumnStatisticsObj> colStats = new ArrayList<>();
+     // 2. Aggregate stats for each column in a separate thread
+     if (map.size()< 1) {
+       //stats are absent in RDBMS
+       LOG.debug("No stats data found for: dbName=" +dbName +" tblName=" + tableName +
+           " partNames= " + partNames + " colNames=" + colNames );
+       return colStats;
+     }
+     final ExecutorService pool = Executors.newFixedThreadPool(Math.min(map.size(), 16),
+         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build());
+     final List<Future<ColumnStatisticsObj>> futures = Lists.newLinkedList();
+ 
+     long start = System.currentTimeMillis();
+     for (final Entry<String, List<ColumnStatistics>> entry : map.entrySet()) {
+       futures.add(pool.submit(new Callable<ColumnStatisticsObj>() {
+         @Override
+         public ColumnStatisticsObj call() throws Exception {
+           List<ColumnStatistics> css = entry.getValue();
+           ColumnStatsAggregator aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css
+               .iterator().next().getStatsObj().iterator().next().getStatsData().getSetField(),
+               useDensityFunctionForNDVEstimation, ndvTuner);
+           ColumnStatisticsObj statsObj = aggregator.aggregate(entry.getKey(), partNames, css);
+           return statsObj;
+         }}));
+     }
+     pool.shutdown();
+     for (Future<ColumnStatisticsObj> future : futures) {
+       try {
+         colStats.add(future.get());
+       } catch (InterruptedException | ExecutionException e) {
+         pool.shutdownNow();
+         LOG.debug(e.toString());
+         throw new MetaException(e.toString());
+       }
+     }
+     LOG.debug("Time for aggr col stats in seconds: {} Threads used: {}",
+       ((System.currentTimeMillis() - (double)start))/1000, Math.min(map.size(), 16));
+     return colStats;
+   }
+ 
+ 
+   /**
+    * Produce a hash for the storage descriptor
+    * @param sd storage descriptor to hash
+    * @param md message descriptor to use to generate the hash
+    * @return the hash as a byte array
+    */
+   public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
+     // Note all maps and lists have to be absolutely sorted.  Otherwise we'll produce different
+     // results for hashes based on the OS or JVM being used.
+     md.reset();
+     for (FieldSchema fs : sd.getCols()) {
+       md.update(fs.getName().getBytes(ENCODING));
+       md.update(fs.getType().getBytes(ENCODING));
+       if (fs.getComment() != null) md.update(fs.getComment().getBytes(ENCODING));
+     }
+     if (sd.getInputFormat() != null) {
+       md.update(sd.getInputFormat().getBytes(ENCODING));
+     }
+     if (sd.getOutputFormat() != null) {
+       md.update(sd.getOutputFormat().getBytes(ENCODING));
+     }
+     md.update(sd.isCompressed() ? "true".getBytes(ENCODING) : "false".getBytes(ENCODING));
+     md.update(Integer.toString(sd.getNumBuckets()).getBytes(ENCODING));
+     if (sd.getSerdeInfo() != null) {
+       SerDeInfo serde = sd.getSerdeInfo();
+       if (serde.getName() != null) {
+         md.update(serde.getName().getBytes(ENCODING));
+       }
+       if (serde.getSerializationLib() != null) {
+         md.update(serde.getSerializationLib().getBytes(ENCODING));
+       }
+       if (serde.getParameters() != null) {
+         SortedMap<String, String> params = new TreeMap<>(serde.getParameters());
+         for (Entry<String, String> param : params.entrySet()) {
+           md.update(param.getKey().getBytes(ENCODING));
+           md.update(param.getValue().getBytes(ENCODING));
+         }
+       }
+     }
+     if (sd.getBucketCols() != null) {
+       List<String> bucketCols = new ArrayList<>(sd.getBucketCols());
+       for (String bucket : bucketCols) md.update(bucket.getBytes(ENCODING));
+     }
+     if (sd.getSortCols() != null) {
+       SortedSet<Order> orders = new TreeSet<>(sd.getSortCols());
+       for (Order order : orders) {
+         md.update(order.getCol().getBytes(ENCODING));
+         md.update(Integer.toString(order.getOrder()).getBytes(ENCODING));
+       }
+     }
+     if (sd.getSkewedInfo() != null) {
+       SkewedInfo skewed = sd.getSkewedInfo();
+       if (skewed.getSkewedColNames() != null) {
+         SortedSet<String> colnames = new TreeSet<>(skewed.getSkewedColNames());
+         for (String colname : colnames) md.update(colname.getBytes(ENCODING));
+       }
+       if (skewed.getSkewedColValues() != null) {
+         SortedSet<String> sortedOuterList = new TreeSet<>();
+         for (List<String> innerList : skewed.getSkewedColValues()) {
+           SortedSet<String> sortedInnerList = new TreeSet<>(innerList);
+           sortedOuterList.add(StringUtils.join(sortedInnerList, "."));
+         }
+         for (String colval : sortedOuterList) md.update(colval.getBytes(ENCODING));
+       }
+       if (skewed.getSkewedColValueLocationMaps() != null) {
+         SortedMap<String, String> sortedMap = new TreeMap<>();
+         for (Entry<List<String>, String> smap : skewed.getSkewedColValueLocationMaps().entrySet()) {
+           SortedSet<String> sortedKey = new TreeSet<>(smap.getKey());
+           sortedMap.put(StringUtils.join(sortedKey, "."), smap.getValue());
+         }
+         for (Entry<String, String> e : sortedMap.entrySet()) {
+           md.update(e.getKey().getBytes(ENCODING));
+           md.update(e.getValue().getBytes(ENCODING));
+         }
+       }
+       md.update(sd.isStoredAsSubDirectories() ? "true".getBytes(ENCODING) : "false".getBytes(ENCODING));
+     }
+ 
+     return md.digest();
+   }
+ 
+   public static double decimalToDouble(Decimal decimal) {
+     return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue();
+   }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 88afe03,0db1bc0..903b2c7
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@@ -59,9 -58,11 +59,12 @@@ import javax.jdo.PersistenceManagerFact
  import javax.jdo.Query;
  import javax.jdo.Transaction;
  import javax.jdo.datastore.DataStoreCache;
 +import javax.jdo.datastore.JDOConnection;
  import javax.jdo.identity.IntIdentity;
+ import javax.sql.DataSource;
  
+ import com.codahale.metrics.Counter;
+ import com.codahale.metrics.MetricRegistry;
  import org.apache.commons.lang.ArrayUtils;
  import org.apache.commons.lang.exception.ExceptionUtils;
  import org.apache.hadoop.conf.Configurable;
@@@ -415,10 -431,13 +434,14 @@@ public class ObjectStore implements Raw
      pm = getPersistenceManager();
      isInitialized = pm != null;
      if (isInitialized) {
 +      dbType = determineDatabaseProduct();
        expressionProxy = createExpressionProxy(hiveConf);
        if (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)) {
-         directSql = new MetaStoreDirectSql(pm, hiveConf, dbType);
+         String schema = prop.getProperty("javax.jdo.mapping.Schema");
+         if (schema != null && schema.isEmpty()) {
+           schema = null;
+         }
+         directSql = new MetaStoreDirectSql(pm, hiveConf, schema);
        }
      }
      LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
@@@ -2733,8 -2759,7 +2783,8 @@@
        boolean isConfigEnabled = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)
            && (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL_DDL) || !isInTxn);
        if (isConfigEnabled && directSql == null) {
 -        directSql = new MetaStoreDirectSql(pm, getConf());
 +        dbType = determineDatabaseProduct();
-         directSql = new MetaStoreDirectSql(pm, getConf(), dbType);
++        directSql = new MetaStoreDirectSql(pm, getConf(), "");
        }
  
        if (!allowJdo && isConfigEnabled && !directSql.isCompatibleDatastore()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index ed05381,3a3d184..dabede4
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@@ -39,8 -39,6 +39,7 @@@ public final class TransactionalValidat
  
    // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
    public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
-   public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
 +  public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only";
  
    TransactionalValidationListener(Configuration conf) {
      super(conf);
@@@ -104,13 -114,10 +115,13 @@@
        //normalize prop name
        parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
      }
-     if ("true".equalsIgnoreCase(transactionalValue)) {
+     if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+       //only need to check conformance if alter table enabled aicd
        if (!conformToAcid(newTable)) {
 -        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
 +        // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
 +        if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) {
-           throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-               " format (such as ORC)");
++          throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
 +        }
        }
  
        if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@@ -204,13 -194,9 +204,12 @@@
        return;
      }
  
 -    if ("true".equalsIgnoreCase(transactionalValue)) {
 +    if ("true".equalsIgnoreCase(transactional)) {
        if (!conformToAcid(newTable)) {
 -        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
 +        // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
 +        if (transactionalProperties == null || !"insert_only".equalsIgnoreCase(transactionalProperties)) {
-           throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-               " format (such as ORC)");
++          throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
 +        }
        }
  
        if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@@ -289,8 -273,6 +286,7 @@@
      boolean isValid = false;
      switch (transactionalProperties) {
        case DEFAULT_TRANSACTIONAL_PROPERTY:
-       case LEGACY_TRANSACTIONAL_PROPERTY:
 +      case INSERTONLY_TRANSACTIONAL_PROPERTY:
          isValid = true;
          break;
        default:

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 906591d,93d1ba6..a97ba1a
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@@ -18,7 -18,11 +18,16 @@@
  package org.apache.hadoop.hive.metastore.cache;
  
  import java.nio.ByteBuffer;
- import java.util.*;
+ import java.util.ArrayList;
++import java.util.ArrayList;
++import java.util.HashMap;
+ import java.util.HashMap;
+ import java.util.LinkedList;
++import java.util.LinkedList;
+ import java.util.List;
++import java.util.List;
++import java.util.Map;
+ import java.util.Map;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ThreadFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --cc metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 6d851a0,fb16cfc..0f7827b
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@@ -110,7 -114,14 +112,12 @@@ public class DummyRawStoreForJdoConnect
    }
  
    @Override
+   public boolean isActiveTransaction() {
+     return false;
+   }
+ 
+   @Override
    public void rollbackTransaction() {
 -
 -
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --cc metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 9542990,08228af..7573fb2
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@@ -379,13 -362,14 +377,14 @@@ public class TestObjectStore 
    public void testDirectSqlErrorMetrics() throws Exception {
      HiveConf conf = new HiveConf();
      conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
-     conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name()
-         + "," + MetricsReporting.JMX.name());
+     Metrics.initialize(conf);
  
-     MetricsFactory.init(conf);
-     CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
+     // recall setup so that we get an object store with the metrics initalized
+     setUp();
+     Counter directSqlErrors =
+         Metrics.getRegistry().getCounters().get(MetricsConstants.DIRECTSQL_ERRORS);
  
 -    objectStore.new GetDbHelper("foo", null, true, true) {
 +    objectStore.new GetDbHelper("foo", true, true) {
        @Override
        protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
          return null;
@@@ -398,11 -382,9 +397,9 @@@
        }
      }.run(false);
  
-     String json = metrics.dumpJson();
-     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER,
-         MetricsConstant.DIRECTSQL_ERRORS, "");
+     Assert.assertEquals(0, directSqlErrors.getCount());
  
 -    objectStore.new GetDbHelper("foo", null, true, true) {
 +    objectStore.new GetDbHelper("foo", true, true) {
        @Override
        protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
          throw new RuntimeException();

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 82a9fad,acc2390..b6eff94
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@@ -3846,11 -3867,42 +3884,22 @@@ public class DDLTask extends Task<DDLWo
            throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS, alterTbl.getOldName());
          }
        }
+ 
+       boolean partitioned = tbl.isPartitioned();
+       boolean droppingColumns = alterTbl.getNewCols().size() < sd.getCols().size();
+       if (ParquetHiveSerDe.isParquetTable(tbl) &&
+           isSchemaEvolutionEnabled(tbl) &&
+           !alterTbl.getIsCascade() &&
+           droppingColumns && partitioned) {
+         LOG.warn("Cannot drop columns from a partitioned parquet table without the CASCADE option");
+         throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS,
+             alterTbl.getOldName());
+       }
        sd.setCols(alterTbl.getNewCols());
      } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
 -      if (StatsSetupConst.USER.equals(environmentContext.getProperties()
 -              .get(StatsSetupConst.STATS_GENERATED))) {
 -        environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 -      }
 -      if (part != null) {
 -        part.getTPartition().getParameters().putAll(alterTbl.getProps());
 -      } else {
 -        tbl.getTTable().getParameters().putAll(alterTbl.getProps());
 -      }
 +      return alterTableAddProps(alterTbl, tbl, part, environmentContext);
      } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.DROPPROPS) {
 -      Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
 -      if (StatsSetupConst.USER.equals(environmentContext.getProperties()
 -          .get(StatsSetupConst.STATS_GENERATED))) {
 -        // drop a stats parameter, which triggers recompute stats update automatically
 -        environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 -      }
 -      while (keyItr.hasNext()) {
 -        if (part != null) {
 -          part.getTPartition().getParameters().remove(keyItr.next());
 -        } else {
 -          tbl.getTTable().getParameters().remove(keyItr.next());
 -        }
 -      }
 +      return alterTableDropProps(alterTbl, tbl, part, environmentContext);
      } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) {
        StorageDescriptor sd = retrieveStorageDescriptor(tbl, part);
        sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
@@@ -4002,219 -4054,7 +4051,215 @@@
        throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString());
      }
  
 -    return 0;
 +    return null;
 +  }
 +
 +  private List<Task<?>> alterTableDropProps(AlterTableDesc alterTbl, Table tbl,
 +      Partition part, EnvironmentContext environmentContext) throws HiveException {
 +    if (StatsSetupConst.USER.equals(environmentContext.getProperties()
 +        .get(StatsSetupConst.STATS_GENERATED))) {
 +      // drop a stats parameter, which triggers recompute stats update automatically
 +      environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 +    }
 +
 +    List<Task<?>> result = null;
 +    if (part == null) {
 +      Set<String> removedSet = alterTbl.getProps().keySet();
 +      boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()),
 +          isRemoved = MetaStoreUtils.isRemovedInsertOnlyTable(removedSet);
 +      if (isFromMmTable && isRemoved) {
 +        result = generateRemoveMmTasks(tbl);
 +      }
 +    }
 +    Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
 +    while (keyItr.hasNext()) {
 +      if (part != null) {
 +        part.getTPartition().getParameters().remove(keyItr.next());
 +      } else {
 +        tbl.getTTable().getParameters().remove(keyItr.next());
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private List<Task<?>> generateRemoveMmTasks(Table tbl) throws HiveException {
 +    // To avoid confusion from nested MM directories when table is converted back and forth, we
 +    // want to rename mm_ dirs to remove the prefix; however, given the unpredictable nested
 +    // directory handling in Hive/MR, we will instead move all the files into the root directory.
 +    // We will also delete any directories that are not committed. 
 +    // Note that this relies on locks. Note also that we only do the renames AFTER the metastore
 +    // operation commits. Deleting uncommitted things is safe, but moving stuff before we convert
 +    // could cause data loss.
 +    List<Path> allMmDirs = new ArrayList<>();
 +    if (tbl.isStoredAsSubDirectories()) {
 +      // TODO: support this? we only bail because it's a PITA and hardly anyone seems to care.
 +      throw new HiveException("Converting list bucketed tables stored as subdirectories "
 +          + " to and from MM is not supported");
 +    }
 +    List<String> bucketCols = tbl.getBucketCols();
 +    if (bucketCols != null && !bucketCols.isEmpty()
 +        && HiveConf.getBoolVar(conf, ConfVars.HIVE_STRICT_CHECKS_BUCKETING)) {
 +      throw new HiveException("Converting bucketed tables from MM is not supported by default; "
 +          + "copying files from multiple MM directories may potentially break the buckets. You "
 +          + "can set " + ConfVars.HIVE_STRICT_CHECKS_BUCKETING.varname
 +          + " to false for this query if you want to force the conversion.");
 +    }
 +    Hive db = getHive();
 +    String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
 +    ValidTxnList validTxnList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
 +    if (tbl.getPartitionKeys().size() > 0) {
 +      PartitionIterable parts = new PartitionIterable(db, tbl, null,
 +          HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
 +      Iterator<Partition> partIter = parts.iterator();
 +      while (partIter.hasNext()) {
 +        Partition part = partIter.next();
 +        checkMmLb(part);
 +        handleRemoveMm(part.getDataLocation(), validTxnList, allMmDirs);
 +      }
 +    } else {
 +      checkMmLb(tbl);
 +      handleRemoveMm(tbl.getDataLocation(), validTxnList, allMmDirs);
 +    }
 +    List<Path> targetPaths = new ArrayList<>(allMmDirs.size());
 +    List<String> targetPrefix = new ArrayList<>(allMmDirs.size());
 +    int prefixLen = JavaUtils.DELTA_PREFIX.length();
 +    for (int i = 0; i < allMmDirs.size(); ++i) {
 +      Path src = allMmDirs.get(i);
 +      Path tgt = src.getParent();
 +      String prefix = src.getName().substring(prefixLen + 1) + "_";
 +      Utilities.LOG14535.info("Will move " + src + " to " + tgt + " (prefix " + prefix + ")");
 +      targetPaths.add(tgt);
 +      targetPrefix.add(prefix);
 +    }
 +    // Don't set inputs and outputs - the locks have already been taken so it's pointless.
 +    MoveWork mw = new MoveWork(null, null, null, null, false);
 +    mw.setMultiFilesDesc(new LoadMultiFilesDesc(
 +        allMmDirs, targetPaths, targetPrefix, true, null, null));
 +    return Lists.<Task<?>>newArrayList(TaskFactory.get(mw, conf));
 +  }
 +
 +  private void checkMmLb(Table tbl) throws HiveException {
 +    if (!tbl.isStoredAsSubDirectories()) return;
 +    // TODO: support this?
 +    throw new HiveException("Converting list bucketed tables stored as subdirectories "
 +        + " to and from MM is not supported");
 +  }
 +
 +  private void checkMmLb(Partition part) throws HiveException {
 +    if (!part.isStoredAsSubDirectories()) return;
 +    // TODO: support this?
 +    throw new HiveException("Converting list bucketed tables stored as subdirectories "
 +        + " to and from MM is not supported. Please create a table in the desired format.");
 +  }
 +
 +  private void handleRemoveMm(
 +      Path path, ValidTxnList validTxnList, List<Path> result) throws HiveException {
 +    // Note: doesn't take LB into account; that is not presently supported here (throws above).
 +    try {
 +      FileSystem fs = path.getFileSystem(conf);
 +      for (FileStatus file : fs.listStatus(path)) {
 +        Path childPath = file.getPath();
 +        if (!file.isDirectory()) {
 +          ensureDelete(fs, childPath, "a non-directory file");
 +          continue;
 +        }
 +        Long writeId = JavaUtils.extractTxnId(childPath);
 +        if (writeId == null) {
 +          ensureDelete(fs, childPath, "an unknown directory");
 +        } else if (!validTxnList.isTxnValid(writeId)) {
 +          // Assume no concurrent active writes - we rely on locks here. We could check and fail.
 +          ensureDelete(fs, childPath, "an uncommitted directory");
 +        } else {
 +          result.add(childPath);
 +        }
 +      }
 +    } catch (IOException ex) {
 +      throw new HiveException(ex);
 +    }
 +  }
 +
 +  private static void ensureDelete(FileSystem fs, Path path, String what) throws IOException {
 +    Utilities.LOG14535.info("Deleting " + what + " " + path);
 +    try {
 +      if (!fs.delete(path, true)) throw new IOException("delete returned false");
 +    } catch (Exception ex) {
 +      String error = "Couldn't delete " + path + "; cannot remove MM setting from the table";
 +      LOG.error(error, ex);
 +      throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
 +    }
 +  }
 +
 +  private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException {
 +    // We will move all the files in the table/partition directories into the first MM
 +    // directory, then commit the first write ID.
 +    List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>();
 +    long mmWriteId = 0;
 +    try {
 +      HiveTxnManager txnManager = SessionState.get().getTxnMgr();
 +      if (txnManager.isTxnOpen()) {
 +        mmWriteId = txnManager.getCurrentTxnId();
 +      } else {
 +        mmWriteId = txnManager.openTxn(new Context(conf), conf.getUser());
 +        txnManager.commitTxn();
 +      }
 +    } catch (Exception e) {
 +      String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
 +      console.printError(errorMessage, "\n"
 +          + org.apache.hadoop.util.StringUtils.stringifyException(e));
 +    }
 +    int stmtId = 0;
 +    String mmDir = AcidUtils.deltaSubdir(mmWriteId, mmWriteId, stmtId);
 +    Hive db = getHive();
 +    if (tbl.getPartitionKeys().size() > 0) {
 +      PartitionIterable parts = new PartitionIterable(db, tbl, null,
 +          HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
 +      Iterator<Partition> partIter = parts.iterator();
 +      while (partIter.hasNext()) {
 +        Partition part = partIter.next();
 +        checkMmLb(part);
 +        Path src = part.getDataLocation(), tgt = new Path(src, mmDir);
 +        srcs.add(src);
 +        tgts.add(tgt);
 +        Utilities.LOG14535.info("Will move " + src + " to " + tgt);
 +      }
 +    } else {
 +      checkMmLb(tbl);
 +      Path src = tbl.getDataLocation(), tgt = new Path(src, mmDir);
 +      srcs.add(src);
 +      tgts.add(tgt);
 +      Utilities.LOG14535.info("Will move " + src + " to " + tgt);
 +    }
 +    // Don't set inputs and outputs - the locks have already been taken so it's pointless.
 +    MoveWork mw = new MoveWork(null, null, null, null, false);
 +    mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
 +    ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
 +    Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf);
 +    mv.addDependentTask(ic);
 +    return Lists.<Task<?>>newArrayList(mv);
 +  }
 +
 +  private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl,
 +      Partition part, EnvironmentContext environmentContext) throws HiveException {
 +    if (StatsSetupConst.USER.equals(environmentContext.getProperties()
 +        .get(StatsSetupConst.STATS_GENERATED))) {
 +      environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 +    }
-     if(alterTbl.getProps().containsKey(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY)) {
-       NanoTimeUtils.validateTimeZone(
-           alterTbl.getProps().get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY));
-     }
 +    List<Task<?>> result = null;
 +    if (part != null) {
 +      part.getTPartition().getParameters().putAll(alterTbl.getProps());
 +    } else {
 +      boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters());
 +      Boolean isToMmTable = MetaStoreUtils.isToInsertOnlyTable(alterTbl.getProps());
 +      if (isToMmTable != null) {
 +        if (!isFromMmTable && isToMmTable) {
 +          result = generateAddMmTasks(tbl);
 +        } else if (isFromMmTable && !isToMmTable) {
 +          result = generateRemoveMmTasks(tbl);
 +        }
 +      }
 +      tbl.getTTable().getParameters().putAll(alterTbl.getProps());
 +    }
 +    return result;
    }
  
    private int dropConstraint(Hive db, AlterTableDesc alterTbl)
@@@ -4583,22 -4426,8 +4631,8 @@@
        }
      }
  
-     // If HIVE_PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE is set to True, then set new Parquet tables timezone
-     // to UTC by default (only if the table property is not set)
-     if (ParquetHiveSerDe.isParquetTable(tbl)) {
-       SessionState ss = SessionState.get();
-       String parquetTimezone = tbl.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
-       if (parquetTimezone == null || parquetTimezone.isEmpty()) {
-         if (ss.getConf().getBoolVar(ConfVars.HIVE_PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE)) {
-           tbl.setProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
-         }
-       } else {
-         NanoTimeUtils.validateTimeZone(parquetTimezone);
-       }
-     }
- 
      // create the table
 -    if (crtTbl.getReplaceMode()){
 +    if (crtTbl.getReplaceMode()) {
        // replace-mode creates are really alters using CreateTableDesc.
        try {
          db.alterTable(tbl.getDbName()+"."+tbl.getTableName(),tbl,null);

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 0582f94,13750cd..d2d9946
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@@ -382,14 -368,7 +379,11 @@@ public class FetchOperator implements S
  
        Class<? extends InputFormat> formatter = currDesc.getInputFileFormatClass();
        Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
-       if (ParquetHiveSerDe.class.getName().equals(currDesc.getTableDesc().getSerdeClassName())) {
-         ParquetTableUtils.setParquetTimeZoneIfAbsent(job, currDesc.getTableDesc().getProperties());
-       }
        InputFormat inputFormat = getInputFormatFromCache(formatter, job);
 +      String inputs = processCurrPathForMmWriteIds(inputFormat);
 +      Utilities.LOG14535.info("Setting fetch inputs to " + inputs);
 +      if (inputs == null) return null;
 +      job.set("mapred.input.dir", inputs);
  
        InputSplit[] splits = inputFormat.getSplits(job, 1);
        FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 7e1b8fa,bc265eb..3544884
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@@ -41,11 -28,11 +41,13 @@@ import org.apache.hadoop.hive.common.St
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.conf.HiveConfUtil;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
  import org.apache.hadoop.hive.ql.CompilationOpContext;
  import org.apache.hadoop.hive.ql.ErrorMsg;
 +import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
 +import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+ import org.apache.hadoop.hive.ql.io.BucketCodec;
  import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
  import org.apache.hadoop.hive.ql.io.HiveKey;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@@ -165,28 -149,11 +167,28 @@@ public class FileSinkOperator extends T
      Path[] finalPaths;
      RecordWriter[] outWriters;
      RecordUpdater[] updaters;
-     private Stat stat;
+     Stat stat;
 +    int acidLastBucket = -1;
 +    int acidFileOffset = -1;
 +    private boolean isMmTable;
 +    private Long txnId;
 +    private int stmtId;
 +
 +    public FSPaths(Path specPath, boolean isMmTable) {
 +      this.isMmTable = isMmTable;
 +      if (!isMmTable) {
 +        tmpPath = Utilities.toTempPath(specPath);
 +        taskOutputTempPath = Utilities.toTaskTempPath(specPath);
 +      } else {
 +        tmpPath = specPath;
 +        taskOutputTempPath = null; // Should not be used.
 +        txnId = conf.getTransactionId();
 +        stmtId = conf.getStatementId();
 +      }
 +      Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
 +          + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
 +          + " (spec path " + specPath + ")"/*, new Exception()*/);
  
 -    public FSPaths(Path specPath) {
 -      tmpPath = Utilities.toTempPath(specPath);
 -      taskOutputTempPath = Utilities.toTaskTempPath(specPath);
        outPaths = new Path[numFiles];
        finalPaths = new Path[numFiles];
        outWriters = new RecordWriter[numFiles];
@@@ -881,10 -758,10 +885,10 @@@
        // for a given operator branch prediction should work quite nicely on it.
        // RecordUpdateer expects to get the actual row, not a serialized version of it.  Thus we
        // pass the row rather than recordValue.
 -      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
 +      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) {
-         rowOutWriters[writerOffset].write(recordValue);
+         rowOutWriters[findWriterOffset(row)].write(recordValue);
        } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
-         fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
+         fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), row);
        } else {
          // TODO I suspect we could skip much of the stuff above this in the function in the case
          // of update and delete.  But I don't understand all of the side effects of the above

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e86ca3a,cde2805..34d0598
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -403,232 -579,6 +436,231 @@@ public class MoveTask extends Task<Move
        return (1);
      }
    }
 +
 +  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
 +    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  tbd.getPartitionSpec());
 +    db.validatePartitionNameCharacters(partVals);
 +    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
 +        + " into " + tbd.getTable().getTableName());
 +    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(), tbd.getReplace(),
 +        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
 +        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
 +        hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId());
 +    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 +
 +    // See the comment inside updatePartitionBucketSortColumns.
 +    if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
 +          ti.numBuckets, ti.sortCols);
 +    }
 +
 +    DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +    // add this partition to post-execution hook
 +    if (work.getOutputs() != null) {
 +      DDLTask.addIfAbsentByName(new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
 +    }
 +    return dc;
 +  }
 +
 +  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
 +      IOException, InvalidOperationException {
 +    DataContainer dc;
 +    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
 +
 +    console.printInfo(System.getProperty("line.separator"));
 +    long startTime = System.currentTimeMillis();
 +    // load the list of DP partitions and return the list of partition specs
 +    // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
 +    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 +    // After that check the number of DPs created to not exceed the limit and
 +    // iterate over it and call loadPartition() here.
 +    // The reason we don't do inside HIVE-1361 is the latter is large and we
 +    // want to isolate any potential issue it may introduce.
 +    if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +      throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID");
 +    }
 +    Map<Map<String, String>, Partition> dp =
 +      db.loadDynamicPartitions(
 +        tbd.getSourcePath(),
 +        tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(),
 +        tbd.getReplace(),
 +        dpCtx.getNumDPCols(),
 +        (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(),
 +        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
 +        SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), hasFollowingStatsTask(),
 +        work.getLoadTableWork().getWriteType());
 +
 +    // publish DP columns to its subscribers
 +    if (dps != null && dps.size() > 0) {
 +      pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 +    }
 +
 +    String loadTime = "\t Time taken to load dynamic partitions: "  +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 +    console.printInfo(loadTime);
 +    LOG.info(loadTime);
 +
 +    if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 +      throw new HiveException("This query creates no partitions." +
 +          " To turn off this error, set hive.error.on.empty.partition=false.");
 +    }
 +
 +    startTime = System.currentTimeMillis();
 +    // for each partition spec, get the partition
 +    // and put it to WriteEntity for post-exec hook
 +    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 +      Partition partn = entry.getValue();
 +
 +      // See the comment inside updatePartitionBucketSortColumns.
 +      if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +        updatePartitionBucketSortColumns(
 +            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
 +      }
 +
 +      WriteEntity enty = new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 +      if (work.getOutputs() != null) {
 +        DDLTask.addIfAbsentByName(enty, work.getOutputs());
 +      }
 +      // Need to update the queryPlan's output as well so that post-exec hook get executed.
 +      // This is only needed for dynamic partitioning since for SP the the WriteEntity is
 +      // constructed at compile time and the queryPlan already contains that.
 +      // For DP, WriteEntity creation is deferred at this stage so we need to update
 +      // queryPlan here.
 +      if (queryPlan.getOutputs() == null) {
 +        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 +      }
 +      queryPlan.getOutputs().add(enty);
 +
 +      // update columnar lineage for each partition
 +      dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +
 +      // Don't set lineage on delete as we don't have all the columns
 +      if (SessionState.get() != null &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
 +        SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
 +            table.getCols());
 +      }
 +      LOG.info("\tLoading partition " + entry.getKey());
 +    }
 +    console.printInfo("\t Time taken for adding to write entity : " +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 +    dc = null; // reset data container to prevent it being added again.
 +    return dc;
 +  }
 +
 +  private void inferTaskInformation(TaskInformation ti) {
 +    // Find the first ancestor of this MoveTask which is some form of map reduce task
 +    // (Either standard, local, or a merge)
 +    while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
 +      ti.task = (Task)ti.task.getParentTasks().get(0);
 +      // If it was a merge task or a local map reduce task, nothing can be inferred
 +      if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
 +        break;
 +      }
 +
 +      // If it's a standard map reduce task, check what, if anything, it inferred about
 +      // the directory this move task is moving
 +      if (ti.task instanceof MapRedTask) {
 +        MapredWork work = (MapredWork)ti.task.getWork();
 +        MapWork mapWork = work.getMapWork();
 +        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
 +        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
 +        if (work.getReduceWork() != null) {
 +          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
 +        }
 +
 +        if (ti.bucketCols != null || ti.sortCols != null) {
 +          // This must be a final map reduce task (the task containing the file sink
 +          // operator that writes the final output)
 +          assert work.isFinalMapRed();
 +        }
 +        break;
 +      }
 +
 +      // If it's a move task, get the path the files were moved from, this is what any
 +      // preceding map reduce task inferred information about, and moving does not invalidate
 +      // those assumptions
 +      // This can happen when a conditional merge is added before the final MoveTask, but the
 +      // condition for merging is not met, see GenMRFileSink1.
 +      if (ti.task instanceof MoveTask) {
 +        MoveTask mt = (MoveTask)ti.task;
 +        if (mt.getWork().getLoadFileWork() != null) {
 +          ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
 +        }
 +      }
 +    }
 +  }
 +
 +  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
 +      throws HiveException {
 +    if (work.getCheckFileFormat()) {
 +      // Get all files from the src directory
 +      FileStatus[] dirs;
 +      ArrayList<FileStatus> files;
 +      FileSystem srcFs; // source filesystem
 +      try {
 +        srcFs = tbd.getSourcePath().getFileSystem(conf);
 +        dirs = srcFs.globStatus(tbd.getSourcePath());
 +        files = new ArrayList<FileStatus>();
 +        for (int i = 0; (dirs != null && i < dirs.length); i++) {
 +          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
 +          // We only check one file, so exit the loop when we have at least
 +          // one.
 +          if (files.size() > 0) {
 +            break;
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new HiveException(
 +            "addFiles: filesystem error in check phase", e);
 +      }
 +
 +      // handle file format check for table level
 +      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
 +        boolean flag = true;
 +        // work.checkFileFormat is set to true only for Load Task, so assumption here is
 +        // dynamic partition context is null
 +        if (tbd.getDPCtx() == null) {
 +          if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
 +            // Check if the file format of the file matches that of the table.
 +            flag = HiveFileFormatUtils.checkInputFormat(
 +                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +          } else {
 +            // Check if the file format of the file matches that of the partition
 +            Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
 +            if (oldPart == null) {
 +              // this means we have just created a table and are specifying partition in the
 +              // load statement (without pre-creating the partition), in which case lets use
 +              // table input format class. inheritTableSpecs defaults to true so when a new
 +              // partition is created later it will automatically inherit input format
 +              // from table object
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +            } else {
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, oldPart.getInputFormatClass(), files);
 +            }
 +          }
 +          if (!flag) {
-             throw new HiveException(
-                 "Wrong file format. Please check the file's format.");
++            throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT);
 +          }
 +        } else {
 +          LOG.warn("Skipping file format check as dpCtx is not null");
 +        }
 +      }
 +    }
 +  }
 +
 +
    /**
     * so to make sure we crate WriteEntity with the right WriteType.  This is (at this point) only
     * for consistency since LockManager (which is the only thing that pays attention to WriteType)

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ea0a0fd,aca99f2..88f5a0d
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@@ -1445,14 -1447,21 +1471,21 @@@ public final class Utilities 
          // create empty buckets if necessary
          if (emptyBuckets.size() > 0) {
            perfLogger.PerfLogBegin("FileSinkOperator", "CreateEmptyBuckets");
 -          createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
 +          createEmptyBuckets(
 +              hconf, emptyBuckets, conf.getCompressed(), conf.getTableInfo(), reporter);
+           filesKept.addAll(emptyBuckets);
            perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets");
          }
 -
          // move to the file destination
 -        log.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
 +        Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
          perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
-         Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+         if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR)) {
+           // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks,
+           // by moving just the files we've tracked from removeTempOrDuplicateFiles().
+           Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept);
+         } else {
+           Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+         }
          perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
        }
      } else {
@@@ -1534,27 -1552,7 +1578,27 @@@
     * @return a list of path names corresponding to should-be-created empty buckets.
     */
    public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
-       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
+       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set<Path> filesKept) throws IOException {
 +    int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
 +        numBuckets = (conf != null && conf.getTable() != null)
 +          ? conf.getTable().getNumBuckets() : 0;
 +    return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0, false);
 +  }
 +  
 +  private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
 +    FileStatus[] items = fs.listStatus(path);
 +    // remove empty directory since DP insert should not generate empty partitions.
 +    // empty directories could be generated by crashed Task/ScriptOperator
 +    if (items.length != 0) return false;
 +    if (!fs.delete(path, true)) {
 +      LOG.error("Cannot delete empty directory " + path);
 +      throw new IOException("Cannot delete empty directory " + path);
 +    }
 +    return true;
 +  }
 +
 +  public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
 +      int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId, boolean isMmTable) throws IOException {
      if (fileStats == null) {
        return null;
      }
@@@ -3145,21 -3150,14 +3235,21 @@@
        LOG.info("Processing alias " + alias);
  
        // The alias may not have any path
 +      Collection<Map.Entry<Path, ArrayList<String>>> pathToAliases =
 +          work.getPathToAliases().entrySet();
 +      if (!skipDummy) {
 +        // ConcurrentModification otherwise if adding dummy.
 +        pathToAliases = new ArrayList<>(pathToAliases);
 +      }
        boolean isEmptyTable = true;
        boolean hasLogged = false;
 -      // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
 -      for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
 +      Path path = null;
 +      for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
          if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-           throw new IOException("Operation is Canceled. ");
+           throw new IOException("Operation is Canceled.");
  
 -        List<String> aliases = work.getPathToAliases().get(file);
 +        Path file = e.getKey();
 +        List<String> aliases = e.getValue();
          if (aliases.contains(alias)) {
            if (file != null) {
              isEmptyTable = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 0000000,c944a13..2ae18cf
mode 000000,100644..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@@ -1,0 -1,314 +1,316 @@@
+ /*
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+ 
+       http://www.apache.org/licenses/LICENSE-2.0
+ 
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  */
+ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+ 
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.Warehouse;
+ import org.apache.hadoop.hive.metastore.api.Database;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+ import org.apache.hadoop.hive.ql.exec.Task;
+ import org.apache.hadoop.hive.ql.exec.TaskFactory;
+ import org.apache.hadoop.hive.ql.exec.Utilities;
+ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+ import org.apache.hadoop.hive.ql.metadata.HiveException;
+ import org.apache.hadoop.hive.ql.metadata.Partition;
+ import org.apache.hadoop.hive.ql.metadata.Table;
+ import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+ import org.apache.hadoop.hive.ql.parse.SemanticException;
+ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+ import org.apache.hadoop.hive.ql.plan.DDLWork;
+ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+ import org.apache.hadoop.hive.ql.plan.MoveWork;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.mortbay.jetty.servlet.AbstractSessionManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.IOException;
+ import java.io.Serializable;
+ import java.util.*;
+ 
+ import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
+ import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
+ import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString;
+ 
+ public class LoadPartitions {
+   private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class);
+ 
+   private final Context context;
+   private final ReplLogger replLogger;
+   private final TableContext tableContext;
+   private final TableEvent event;
+   private final TaskTracker tracker;
+   private final AddPartitionDesc lastReplicatedPartition;
+ 
+   private final ImportTableDesc tableDesc;
+   private Table table;
+ 
+   public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
+                         TableEvent event, String dbNameToLoadIn,
+                         TableContext tableContext) throws HiveException, IOException {
+     this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
+   }
+ 
+   public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
+                         TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
+                         AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
+     this.tracker = new TaskTracker(limiter);
+     this.event = event;
+     this.context = context;
+     this.replLogger = replLogger;
+     this.lastReplicatedPartition = lastReplicatedPartition;
+     this.tableContext = tableContext;
+ 
+     this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn));
+     this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
+   }
+ 
+   private String location() throws MetaException, HiveException {
+     Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+     if (!tableContext.waitOnPrecursor()) {
+       return context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()).toString();
+     } else {
+       Path tablePath = new Path(
+           context.warehouse.getDefaultDatabasePath(tableDesc.getDatabaseName()),
+           MetaStoreUtils.encodeTableName(tableDesc.getTableName().toLowerCase())
+       );
+       return context.warehouse.getDnsPath(tablePath).toString();
+     }
+   }
+ 
+   private void createTableReplLogTask() throws SemanticException {
+     ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,
+                                             tableDesc.getTableName(), tableDesc.tableType());
+     Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf);
+ 
+     if (tracker.tasks().isEmpty()) {
+       tracker.addTask(replLogTask);
+     } else {
+       ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+ 
+       List<Task<? extends Serializable>> visited = new ArrayList<>();
+       tracker.updateTaskCount(replLogTask, visited);
+     }
+   }
+ 
+   public TaskTracker tasks() throws SemanticException {
+     try {
+       /*
+       We are doing this both in load table and load partitions
+        */
+       if (tableDesc.getLocation() == null) {
+         tableDesc.setLocation(location());
+       }
+ 
+       if (table == null) {
+         //new table
+ 
+         table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName());
+         if (isPartitioned(tableDesc)) {
+           updateReplicationState(initialReplicationState());
+           if (!forNewTable().hasReplicationState()) {
+             // Add ReplStateLogTask only if no pending table load tasks left for next cycle
+             createTableReplLogTask();
+           }
+           return tracker;
+         }
+       } else {
+         // existing
+ 
+         if (table.isPartitioned()) {
+           List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
+           if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
+             updateReplicationState(initialReplicationState());
+             if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
+               // Add ReplStateLogTask only if no pending table load tasks left for next cycle
+               createTableReplLogTask();
+             }
+             return tracker;
+           }
+         }
+       }
+       return tracker;
+     } catch (Exception e) {
+       throw new SemanticException(e);
+     }
+   }
+ 
+   private void updateReplicationState(ReplicationState replicationState) throws SemanticException {
+     if (!tracker.canAddMoreTasks()) {
+       tracker.setReplicationState(replicationState);
+     }
+   }
+ 
+   private ReplicationState initialReplicationState() throws SemanticException {
+     return new ReplicationState(
+         new PartitionState(tableDesc.getTableName(), lastReplicatedPartition)
+     );
+   }
+ 
+   private TaskTracker forNewTable() throws Exception {
+     Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
+     while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+       AddPartitionDesc addPartitionDesc = iterator.next();
+       tracker.addTask(addSinglePartition(table, addPartitionDesc));
+       if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
+         ReplicationState currentReplicationState =
+                 new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc));
+         updateReplicationState(currentReplicationState);
+       }
+     }
+     return tracker;
+   }
+ 
+   /**
+    * returns the root task for adding a partition
+    */
+   private Task<? extends Serializable> addSinglePartition(Table table,
+       AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException {
+     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
+     Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
+     Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+     partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+     LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+         + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+         + partSpec.getLocation());
+     Path tmpPath = context.utils.getExternalTmpPath(replicaWarehousePartitionLocation);
+ 
+     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+         event.replicationSpec(),
+         sourceWarehousePartitionLocation,
+         tmpPath,
+         context.hiveConf
+     );
+ 
+     Task<?> addPartTask = TaskFactory.get(
+         new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
+         context.hiveConf
+     );
+ 
+     Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
+ 
+     copyTask.addDependentTask(addPartTask);
+     addPartTask.addDependentTask(movePartitionTask);
+     return copyTask;
+   }
+ 
+   /**
+    * This will create the move of partition data from temp path to actual path
+    */
+   private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec,
+       Path tmpPath) {
+     LoadTableDesc loadTableWork = new LoadTableDesc(
+         tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
 -        event.replicationSpec().isReplace()
++        event.replicationSpec().isReplace(), SessionState.get().getTxnMgr().getCurrentTxnId()
+     );
+     loadTableWork.setInheritTableSpecs(false);
+     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+     return TaskFactory.get(work, context.hiveConf);
+   }
+ 
+   private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)
+       throws MetaException, HiveException, IOException {
+     String child = Warehouse.makePartPath(partSpec.getPartSpec());
+     if (tableDesc.getLocation() == null) {
+       if (table.getDataLocation() == null) {
+         Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+         return new Path(
+             context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()), child);
+       } else {
+         return new Path(table.getDataLocation().toString(), child);
+       }
+     } else {
+       return new Path(tableDesc.getLocation(), child);
+     }
+   }
+ 
+   private Task<? extends Serializable> alterSinglePartition(AddPartitionDesc desc,
+       ReplicationSpec replicationSpec, Partition ptn) {
+     desc.setReplaceMode(true);
+     if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) {
+       desc.setReplicationSpec(replicationSpec);
+     }
+     desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
+     return TaskFactory.get(
+         new DDLWork(new HashSet<>(), new HashSet<>(), desc),
+         context.hiveConf
+     );
+   }
+ 
+   private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception {
+     boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null);
+     ReplicationSpec replicationSpec = event.replicationSpec();
+     LOG.debug("table partitioned");
+ 
+     Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
+     while (iterator.hasNext()) {
+       /*
+       encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks
+       for a table, as we have reached the limit of number of tasks we should create for execution.
+       in this case on the next run we have to iterate over the partitions desc to reach the last replicated
+       partition so that we can start replicating partitions after that.
+        */
+       AddPartitionDesc addPartitionDesc = iterator.next();
+       if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) {
+         Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
+         Partition ptn;
+ 
+         if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == null) {
+           if (!replicationSpec.isMetadataOnly()) {
+             forNewTable();
+           }
+         } else {
+           // If replicating, then the partition already existing means we need to replace, maybe, if
+           // the destination ptn's repl.last.id is older than the replacement's.
+           if (replicationSpec.allowReplacementInto(ptn.getParameters())) {
+             if (replicationSpec.isMetadataOnly()) {
+               tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn));
+               if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
+                 tracker.setReplicationState(
+                     new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)
+                     )
+                 );
+               }
+             } else {
+               forNewTable();
+             }
+           } else {
+             // ignore this ptn, do nothing, not an error.
+           }
+         }
+       } else {
+         Map<String, String> currentSpec = addPartitionDesc.getPartition(0).getPartSpec();
+         Map<String, String> lastReplicatedPartSpec =
+             lastPartitionReplicated.getPartition(0).getPartSpec();
+         encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec);
+       }
+     }
+     return tracker;
+   }
+ }
+