You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/10/13 00:15:57 UTC
[18/50] [abbrv] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --cc common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ff8a7aa,e4b09a2..21c4cc9
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@@ -1864,14 -1855,9 +1857,16 @@@ public class HiveConf extends Configura
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
- HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
+ HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
- "This is intended to be used as an internal property for future versions of ACID. (See\n" +
- "HIVE-14035 for details.)"),
+ "Sets the operational properties that control the appropriate behavior for various\n"
+ + "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
+ + "for future versions of ACID. (See HIVE-14035 for details.)\n"
+ + "0: Turn on the legacy mode for ACID\n"
+ + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n"
+ + "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n"
- + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing."),
++ + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n"
++ + "This is intended to be used as an internal property for future versions of ACID. (See\n" +
++ "HIVE-14035 for details.)"),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 913e333,5812a1b..ab5f7b7
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@@ -192,13 -194,14 +197,13 @@@ public class HiveMetaStore extends Thri
};
};
- /**
- * default port on which to start the Hive server
- */
public static final String ADMIN = "admin";
public static final String PUBLIC = "public";
+ /** MM write states. */
+ public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a';
private static HadoopThriftAuthBridge.Server saslServer;
- private static HiveDelegationTokenManager delegationTokenManager;
+ private static MetastoreDelegationTokenManager delegationTokenManager;
private static boolean useSasl;
public static final String NO_FILTER_STRING = "";
@@@ -437,41 -495,14 +497,15 @@@
}
}
- //Start Metrics for Embedded mode
+ //Start Metrics
if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
- try {
- MetricsFactory.init(hiveConf);
- } catch (Exception e) {
- // log exception, but ignore inability to start
- LOG.error("error in Metrics init: " + e.getClass().getName() + " "
- + e.getMessage(), e);
- }
- }
-
- Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null && hiveConf.getBoolVar(ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED)) {
LOG.info("Begin calculating metadata count metrics.");
+ Metrics.initialize(hiveConf);
+ databaseCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_DATABASES);
+ tableCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_TABLES);
+ partCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_PARTITIONS);
updateMetrics();
- LOG.info("Finished metadata count metrics: " + initDatabaseCount + " databases, " + initTableCount +
- " tables, " + initPartCount + " partitions.");
- metrics.addGauge(MetricsConstant.INIT_TOTAL_DATABASES, new MetricsVariable<Object>() {
- @Override
- public Object getValue() {
- return initDatabaseCount;
- }
- });
- metrics.addGauge(MetricsConstant.INIT_TOTAL_TABLES, new MetricsVariable<Object>() {
- @Override
- public Object getValue() {
- return initTableCount;
- }
- });
- metrics.addGauge(MetricsConstant.INIT_TOTAL_PARTITIONS, new MetricsVariable<Object>() {
- @Override
- public Object getValue() {
- return initPartCount;
- }
- });
++
}
preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 8e89477,70451c4..37da2f8
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@@ -59,12 -60,11 +60,12 @@@ import org.apache.hadoop.hive.conf.Hive
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
- import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
- import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TApplicationException;
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index e7bbd62,bbe13fd..49005b9
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@@ -1981,59 -1967,156 +1970,212 @@@ public class MetaStoreUtils
return cols;
}
+ // TODO The following two utility methods can be moved to AcidUtils once no class in metastore is relying on them,
+ // right now ObjectStore.getAllMmTablesForCleanup is calling these method
+ /**
+ * Checks if a table is an ACID table that only supports INSERT, but not UPDATE/DELETE
+ * @param params table properties
+ * @return true if table is an INSERT_ONLY table, false otherwise
+ */
+ // TODO# also check that transactional is true
+ public static boolean isInsertOnlyTable(Map<String, String> params) {
+ return isInsertOnlyTable(params, false);
+ }
+
+ public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
+ String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
+ }
+
+ public static boolean isInsertOnlyTable(Properties params) {
+ // TODO# redirect for now - fix before merge
+ HashMap<String, String> testMap = new HashMap<String, String>();
+ for (String n : params.stringPropertyNames()) {
+ testMap.put(n, params.getProperty(n));
+ }
+ return isInsertOnlyTable(testMap);
+ }
+
+ /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
+ public static Boolean isToInsertOnlyTable(Map<String, String> props) {
+ // TODO# Setting these separately is a very hairy issue in certain combinations, since we
+ // cannot decide what type of table this becomes without taking both into account, and
+ // in many cases the conversion might be illegal.
+ // The only thing we allow is tx = true w/o tx-props, for backward compat.
+ String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ if (transactional == null && transactionalProp == null) return null; // Not affected.
+ boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
+ if (transactionalProp == null) {
+ if (isSetToTxn) return false; // Assume the full ACID table.
+ throw new RuntimeException("Cannot change '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL
+ + "' without '" + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "'");
+ }
+ if (!"insert_only".equalsIgnoreCase(transactionalProp)) return false; // Not MM.
+ if (!isSetToTxn) {
+ throw new RuntimeException("Cannot set '"
+ + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "' to 'insert_only' without "
+ + "setting '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "' to 'true'");
+ }
+ return true;
+ }
+
+ public static boolean isRemovedInsertOnlyTable(Set<String> removedSet) {
+ boolean hasTxn = removedSet.contains(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL),
+ hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ return hasTxn || hasProps;
+ }
++
+ // given a list of partStats, this function will give you an aggr stats
+ public static List<ColumnStatisticsObj> aggrPartitionStats(List<ColumnStatistics> partStats,
+ String dbName, String tableName, List<String> partNames, List<String> colNames,
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner)
+ throws MetaException {
+ // 1. group by the stats by colNames
+ // map the colName to List<ColumnStatistics>
+ Map<String, List<ColumnStatistics>> map = new HashMap<>();
+ for (ColumnStatistics css : partStats) {
+ List<ColumnStatisticsObj> objs = css.getStatsObj();
+ for (ColumnStatisticsObj obj : objs) {
+ List<ColumnStatisticsObj> singleObj = new ArrayList<>();
+ singleObj.add(obj);
+ ColumnStatistics singleCS = new ColumnStatistics(css.getStatsDesc(), singleObj);
+ if (!map.containsKey(obj.getColName())) {
+ map.put(obj.getColName(), new ArrayList<ColumnStatistics>());
+ }
+ map.get(obj.getColName()).add(singleCS);
+ }
+ }
+ return aggrPartitionStats(map,dbName,tableName,partNames,colNames,useDensityFunctionForNDVEstimation, ndvTuner);
+ }
+
+ public static List<ColumnStatisticsObj> aggrPartitionStats(
+ Map<String, List<ColumnStatistics>> map, String dbName, String tableName,
+ final List<String> partNames, List<String> colNames,
+ final boolean useDensityFunctionForNDVEstimation,final double ndvTuner) throws MetaException {
+ List<ColumnStatisticsObj> colStats = new ArrayList<>();
+ // 2. Aggregate stats for each column in a separate thread
+ if (map.size()< 1) {
+ //stats are absent in RDBMS
+ LOG.debug("No stats data found for: dbName=" +dbName +" tblName=" + tableName +
+ " partNames= " + partNames + " colNames=" + colNames );
+ return colStats;
+ }
+ final ExecutorService pool = Executors.newFixedThreadPool(Math.min(map.size(), 16),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build());
+ final List<Future<ColumnStatisticsObj>> futures = Lists.newLinkedList();
+
+ long start = System.currentTimeMillis();
+ for (final Entry<String, List<ColumnStatistics>> entry : map.entrySet()) {
+ futures.add(pool.submit(new Callable<ColumnStatisticsObj>() {
+ @Override
+ public ColumnStatisticsObj call() throws Exception {
+ List<ColumnStatistics> css = entry.getValue();
+ ColumnStatsAggregator aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css
+ .iterator().next().getStatsObj().iterator().next().getStatsData().getSetField(),
+ useDensityFunctionForNDVEstimation, ndvTuner);
+ ColumnStatisticsObj statsObj = aggregator.aggregate(entry.getKey(), partNames, css);
+ return statsObj;
+ }}));
+ }
+ pool.shutdown();
+ for (Future<ColumnStatisticsObj> future : futures) {
+ try {
+ colStats.add(future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ pool.shutdownNow();
+ LOG.debug(e.toString());
+ throw new MetaException(e.toString());
+ }
+ }
+ LOG.debug("Time for aggr col stats in seconds: {} Threads used: {}",
+ ((System.currentTimeMillis() - (double)start))/1000, Math.min(map.size(), 16));
+ return colStats;
+ }
+
+
+ /**
+ * Produce a hash for the storage descriptor
+ * @param sd storage descriptor to hash
+ * @param md message descriptor to use to generate the hash
+ * @return the hash as a byte array
+ */
+ public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
+ // Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different
+ // results for hashes based on the OS or JVM being used.
+ md.reset();
+ for (FieldSchema fs : sd.getCols()) {
+ md.update(fs.getName().getBytes(ENCODING));
+ md.update(fs.getType().getBytes(ENCODING));
+ if (fs.getComment() != null) md.update(fs.getComment().getBytes(ENCODING));
+ }
+ if (sd.getInputFormat() != null) {
+ md.update(sd.getInputFormat().getBytes(ENCODING));
+ }
+ if (sd.getOutputFormat() != null) {
+ md.update(sd.getOutputFormat().getBytes(ENCODING));
+ }
+ md.update(sd.isCompressed() ? "true".getBytes(ENCODING) : "false".getBytes(ENCODING));
+ md.update(Integer.toString(sd.getNumBuckets()).getBytes(ENCODING));
+ if (sd.getSerdeInfo() != null) {
+ SerDeInfo serde = sd.getSerdeInfo();
+ if (serde.getName() != null) {
+ md.update(serde.getName().getBytes(ENCODING));
+ }
+ if (serde.getSerializationLib() != null) {
+ md.update(serde.getSerializationLib().getBytes(ENCODING));
+ }
+ if (serde.getParameters() != null) {
+ SortedMap<String, String> params = new TreeMap<>(serde.getParameters());
+ for (Entry<String, String> param : params.entrySet()) {
+ md.update(param.getKey().getBytes(ENCODING));
+ md.update(param.getValue().getBytes(ENCODING));
+ }
+ }
+ }
+ if (sd.getBucketCols() != null) {
+ List<String> bucketCols = new ArrayList<>(sd.getBucketCols());
+ for (String bucket : bucketCols) md.update(bucket.getBytes(ENCODING));
+ }
+ if (sd.getSortCols() != null) {
+ SortedSet<Order> orders = new TreeSet<>(sd.getSortCols());
+ for (Order order : orders) {
+ md.update(order.getCol().getBytes(ENCODING));
+ md.update(Integer.toString(order.getOrder()).getBytes(ENCODING));
+ }
+ }
+ if (sd.getSkewedInfo() != null) {
+ SkewedInfo skewed = sd.getSkewedInfo();
+ if (skewed.getSkewedColNames() != null) {
+ SortedSet<String> colnames = new TreeSet<>(skewed.getSkewedColNames());
+ for (String colname : colnames) md.update(colname.getBytes(ENCODING));
+ }
+ if (skewed.getSkewedColValues() != null) {
+ SortedSet<String> sortedOuterList = new TreeSet<>();
+ for (List<String> innerList : skewed.getSkewedColValues()) {
+ SortedSet<String> sortedInnerList = new TreeSet<>(innerList);
+ sortedOuterList.add(StringUtils.join(sortedInnerList, "."));
+ }
+ for (String colval : sortedOuterList) md.update(colval.getBytes(ENCODING));
+ }
+ if (skewed.getSkewedColValueLocationMaps() != null) {
+ SortedMap<String, String> sortedMap = new TreeMap<>();
+ for (Entry<List<String>, String> smap : skewed.getSkewedColValueLocationMaps().entrySet()) {
+ SortedSet<String> sortedKey = new TreeSet<>(smap.getKey());
+ sortedMap.put(StringUtils.join(sortedKey, "."), smap.getValue());
+ }
+ for (Entry<String, String> e : sortedMap.entrySet()) {
+ md.update(e.getKey().getBytes(ENCODING));
+ md.update(e.getValue().getBytes(ENCODING));
+ }
+ }
+ md.update(sd.isStoredAsSubDirectories() ? "true".getBytes(ENCODING) : "false".getBytes(ENCODING));
+ }
+
+ return md.digest();
+ }
+
+ public static double decimalToDouble(Decimal decimal) {
+ return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 88afe03,0db1bc0..903b2c7
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@@ -59,9 -58,11 +59,12 @@@ import javax.jdo.PersistenceManagerFact
import javax.jdo.Query;
import javax.jdo.Transaction;
import javax.jdo.datastore.DataStoreCache;
+import javax.jdo.datastore.JDOConnection;
import javax.jdo.identity.IntIdentity;
+ import javax.sql.DataSource;
+ import com.codahale.metrics.Counter;
+ import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configurable;
@@@ -415,10 -431,13 +434,14 @@@ public class ObjectStore implements Raw
pm = getPersistenceManager();
isInitialized = pm != null;
if (isInitialized) {
+ dbType = determineDatabaseProduct();
expressionProxy = createExpressionProxy(hiveConf);
if (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)) {
- directSql = new MetaStoreDirectSql(pm, hiveConf, dbType);
+ String schema = prop.getProperty("javax.jdo.mapping.Schema");
+ if (schema != null && schema.isEmpty()) {
+ schema = null;
+ }
+ directSql = new MetaStoreDirectSql(pm, hiveConf, schema);
}
}
LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
@@@ -2733,8 -2759,7 +2783,8 @@@
boolean isConfigEnabled = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)
&& (HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL_DDL) || !isInTxn);
if (isConfigEnabled && directSql == null) {
- directSql = new MetaStoreDirectSql(pm, getConf());
+ dbType = determineDatabaseProduct();
- directSql = new MetaStoreDirectSql(pm, getConf(), dbType);
++ directSql = new MetaStoreDirectSql(pm, getConf(), "");
}
if (!allowJdo && isConfigEnabled && !directSql.isCompatibleDatastore()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index ed05381,3a3d184..dabede4
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@@ -39,8 -39,6 +39,7 @@@ public final class TransactionalValidat
// These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
- public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
+ public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only";
TransactionalValidationListener(Configuration conf) {
super(conf);
@@@ -104,13 -114,10 +115,13 @@@
//normalize prop name
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
}
- if ("true".equalsIgnoreCase(transactionalValue)) {
+ if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+ //only need to check conformance if alter table enabled aicd
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
+ // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
+ if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
++ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
+ }
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@@ -204,13 -194,9 +204,12 @@@
return;
}
- if ("true".equalsIgnoreCase(transactionalValue)) {
+ if ("true".equalsIgnoreCase(transactional)) {
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
+ // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
+ if (transactionalProperties == null || !"insert_only".equalsIgnoreCase(transactionalProperties)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
++ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
+ }
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@@ -289,8 -273,6 +286,7 @@@
boolean isValid = false;
switch (transactionalProperties) {
case DEFAULT_TRANSACTIONAL_PROPERTY:
- case LEGACY_TRANSACTIONAL_PROPERTY:
+ case INSERTONLY_TRANSACTIONAL_PROPERTY:
isValid = true;
break;
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 906591d,93d1ba6..a97ba1a
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@@ -18,7 -18,11 +18,16 @@@
package org.apache.hadoop.hive.metastore.cache;
import java.nio.ByteBuffer;
- import java.util.*;
+ import java.util.ArrayList;
++import java.util.ArrayList;
++import java.util.HashMap;
+ import java.util.HashMap;
+ import java.util.LinkedList;
++import java.util.LinkedList;
+ import java.util.List;
++import java.util.List;
++import java.util.Map;
+ import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --cc metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 6d851a0,fb16cfc..0f7827b
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@@ -110,7 -114,14 +112,12 @@@ public class DummyRawStoreForJdoConnect
}
@Override
+ public boolean isActiveTransaction() {
+ return false;
+ }
+
+ @Override
public void rollbackTransaction() {
-
-
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --cc metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 9542990,08228af..7573fb2
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@@ -379,13 -362,14 +377,14 @@@ public class TestObjectStore
public void testDirectSqlErrorMetrics() throws Exception {
HiveConf conf = new HiveConf();
conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
- conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name()
- + "," + MetricsReporting.JMX.name());
+ Metrics.initialize(conf);
- MetricsFactory.init(conf);
- CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
+ // recall setup so that we get an object store with the metrics initalized
+ setUp();
+ Counter directSqlErrors =
+ Metrics.getRegistry().getCounters().get(MetricsConstants.DIRECTSQL_ERRORS);
- objectStore.new GetDbHelper("foo", null, true, true) {
+ objectStore.new GetDbHelper("foo", true, true) {
@Override
protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
return null;
@@@ -398,11 -382,9 +397,9 @@@
}
}.run(false);
- String json = metrics.dumpJson();
- MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER,
- MetricsConstant.DIRECTSQL_ERRORS, "");
+ Assert.assertEquals(0, directSqlErrors.getCount());
- objectStore.new GetDbHelper("foo", null, true, true) {
+ objectStore.new GetDbHelper("foo", true, true) {
@Override
protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException {
throw new RuntimeException();
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 82a9fad,acc2390..b6eff94
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@@ -3846,11 -3867,42 +3884,22 @@@ public class DDLTask extends Task<DDLWo
throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS, alterTbl.getOldName());
}
}
+
+ boolean partitioned = tbl.isPartitioned();
+ boolean droppingColumns = alterTbl.getNewCols().size() < sd.getCols().size();
+ if (ParquetHiveSerDe.isParquetTable(tbl) &&
+ isSchemaEvolutionEnabled(tbl) &&
+ !alterTbl.getIsCascade() &&
+ droppingColumns && partitioned) {
+ LOG.warn("Cannot drop columns from a partitioned parquet table without the CASCADE option");
+ throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS,
+ alterTbl.getOldName());
+ }
sd.setCols(alterTbl.getNewCols());
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
- if (StatsSetupConst.USER.equals(environmentContext.getProperties()
- .get(StatsSetupConst.STATS_GENERATED))) {
- environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
- }
- if (part != null) {
- part.getTPartition().getParameters().putAll(alterTbl.getProps());
- } else {
- tbl.getTTable().getParameters().putAll(alterTbl.getProps());
- }
+ return alterTableAddProps(alterTbl, tbl, part, environmentContext);
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.DROPPROPS) {
- Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
- if (StatsSetupConst.USER.equals(environmentContext.getProperties()
- .get(StatsSetupConst.STATS_GENERATED))) {
- // drop a stats parameter, which triggers recompute stats update automatically
- environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
- }
- while (keyItr.hasNext()) {
- if (part != null) {
- part.getTPartition().getParameters().remove(keyItr.next());
- } else {
- tbl.getTTable().getParameters().remove(keyItr.next());
- }
- }
+ return alterTableDropProps(alterTbl, tbl, part, environmentContext);
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) {
StorageDescriptor sd = retrieveStorageDescriptor(tbl, part);
sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
@@@ -4002,219 -4054,7 +4051,215 @@@
throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString());
}
- return 0;
+ return null;
+ }
+
+ private List<Task<?>> alterTableDropProps(AlterTableDesc alterTbl, Table tbl,
+ Partition part, EnvironmentContext environmentContext) throws HiveException {
+ if (StatsSetupConst.USER.equals(environmentContext.getProperties()
+ .get(StatsSetupConst.STATS_GENERATED))) {
+ // drop a stats parameter, which triggers recompute stats update automatically
+ environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
+ }
+
+ List<Task<?>> result = null;
+ if (part == null) {
+ Set<String> removedSet = alterTbl.getProps().keySet();
+ boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()),
+ isRemoved = MetaStoreUtils.isRemovedInsertOnlyTable(removedSet);
+ if (isFromMmTable && isRemoved) {
+ result = generateRemoveMmTasks(tbl);
+ }
+ }
+ Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
+ while (keyItr.hasNext()) {
+ if (part != null) {
+ part.getTPartition().getParameters().remove(keyItr.next());
+ } else {
+ tbl.getTTable().getParameters().remove(keyItr.next());
+ }
+ }
+ return result;
+ }
+
+ private List<Task<?>> generateRemoveMmTasks(Table tbl) throws HiveException {
+ // To avoid confusion from nested MM directories when table is converted back and forth, we
+ // want to rename mm_ dirs to remove the prefix; however, given the unpredictable nested
+ // directory handling in Hive/MR, we will instead move all the files into the root directory.
+ // We will also delete any directories that are not committed.
+ // Note that this relies on locks. Note also that we only do the renames AFTER the metastore
+ // operation commits. Deleting uncommitted things is safe, but moving stuff before we convert
+ // could cause data loss.
+ List<Path> allMmDirs = new ArrayList<>();
+ if (tbl.isStoredAsSubDirectories()) {
+ // TODO: support this? we only bail because it's a PITA and hardly anyone seems to care.
+ throw new HiveException("Converting list bucketed tables stored as subdirectories "
+ + " to and from MM is not supported");
+ }
+ List<String> bucketCols = tbl.getBucketCols();
+ if (bucketCols != null && !bucketCols.isEmpty()
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_STRICT_CHECKS_BUCKETING)) {
+ throw new HiveException("Converting bucketed tables from MM is not supported by default; "
+ + "copying files from multiple MM directories may potentially break the buckets. You "
+ + "can set " + ConfVars.HIVE_STRICT_CHECKS_BUCKETING.varname
+ + " to false for this query if you want to force the conversion.");
+ }
+ Hive db = getHive();
+ String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ ValidTxnList validTxnList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
+ if (tbl.getPartitionKeys().size() > 0) {
+ PartitionIterable parts = new PartitionIterable(db, tbl, null,
+ HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+ Iterator<Partition> partIter = parts.iterator();
+ while (partIter.hasNext()) {
+ Partition part = partIter.next();
+ checkMmLb(part);
+ handleRemoveMm(part.getDataLocation(), validTxnList, allMmDirs);
+ }
+ } else {
+ checkMmLb(tbl);
+ handleRemoveMm(tbl.getDataLocation(), validTxnList, allMmDirs);
+ }
+ List<Path> targetPaths = new ArrayList<>(allMmDirs.size());
+ List<String> targetPrefix = new ArrayList<>(allMmDirs.size());
+ int prefixLen = JavaUtils.DELTA_PREFIX.length();
+ for (int i = 0; i < allMmDirs.size(); ++i) {
+ Path src = allMmDirs.get(i);
+ Path tgt = src.getParent();
+ String prefix = src.getName().substring(prefixLen + 1) + "_";
+ Utilities.LOG14535.info("Will move " + src + " to " + tgt + " (prefix " + prefix + ")");
+ targetPaths.add(tgt);
+ targetPrefix.add(prefix);
+ }
+ // Don't set inputs and outputs - the locks have already been taken so it's pointless.
+ MoveWork mw = new MoveWork(null, null, null, null, false);
+ mw.setMultiFilesDesc(new LoadMultiFilesDesc(
+ allMmDirs, targetPaths, targetPrefix, true, null, null));
+ return Lists.<Task<?>>newArrayList(TaskFactory.get(mw, conf));
+ }
+
+ private void checkMmLb(Table tbl) throws HiveException {
+ if (!tbl.isStoredAsSubDirectories()) return;
+ // TODO: support this?
+ throw new HiveException("Converting list bucketed tables stored as subdirectories "
+ + " to and from MM is not supported");
+ }
+
+ private void checkMmLb(Partition part) throws HiveException {
+ if (!part.isStoredAsSubDirectories()) return;
+ // TODO: support this?
+ throw new HiveException("Converting list bucketed tables stored as subdirectories "
+ + " to and from MM is not supported. Please create a table in the desired format.");
+ }
+
+ private void handleRemoveMm(
+ Path path, ValidTxnList validTxnList, List<Path> result) throws HiveException {
+ // Note: doesn't take LB into account; that is not presently supported here (throws above).
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ for (FileStatus file : fs.listStatus(path)) {
+ Path childPath = file.getPath();
+ if (!file.isDirectory()) {
+ ensureDelete(fs, childPath, "a non-directory file");
+ continue;
+ }
+ Long writeId = JavaUtils.extractTxnId(childPath);
+ if (writeId == null) {
+ ensureDelete(fs, childPath, "an unknown directory");
+ } else if (!validTxnList.isTxnValid(writeId)) {
+ // Assume no concurrent active writes - we rely on locks here. We could check and fail.
+ ensureDelete(fs, childPath, "an uncommitted directory");
+ } else {
+ result.add(childPath);
+ }
+ }
+ } catch (IOException ex) {
+ throw new HiveException(ex);
+ }
+ }
+
+ private static void ensureDelete(FileSystem fs, Path path, String what) throws IOException {
+ Utilities.LOG14535.info("Deleting " + what + " " + path);
+ try {
+ if (!fs.delete(path, true)) throw new IOException("delete returned false");
+ } catch (Exception ex) {
+ String error = "Couldn't delete " + path + "; cannot remove MM setting from the table";
+ LOG.error(error, ex);
+ throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+ }
+ }
+
+ private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException {
+ // We will move all the files in the table/partition directories into the first MM
+ // directory, then commit the first write ID.
+ List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>();
+ long mmWriteId = 0;
+ try {
+ HiveTxnManager txnManager = SessionState.get().getTxnMgr();
+ if (txnManager.isTxnOpen()) {
+ mmWriteId = txnManager.getCurrentTxnId();
+ } else {
+ mmWriteId = txnManager.openTxn(new Context(conf), conf.getUser());
+ txnManager.commitTxn();
+ }
+ } catch (Exception e) {
+ String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
+ console.printError(errorMessage, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ int stmtId = 0;
+ String mmDir = AcidUtils.deltaSubdir(mmWriteId, mmWriteId, stmtId);
+ Hive db = getHive();
+ if (tbl.getPartitionKeys().size() > 0) {
+ PartitionIterable parts = new PartitionIterable(db, tbl, null,
+ HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+ Iterator<Partition> partIter = parts.iterator();
+ while (partIter.hasNext()) {
+ Partition part = partIter.next();
+ checkMmLb(part);
+ Path src = part.getDataLocation(), tgt = new Path(src, mmDir);
+ srcs.add(src);
+ tgts.add(tgt);
+ Utilities.LOG14535.info("Will move " + src + " to " + tgt);
+ }
+ } else {
+ checkMmLb(tbl);
+ Path src = tbl.getDataLocation(), tgt = new Path(src, mmDir);
+ srcs.add(src);
+ tgts.add(tgt);
+ Utilities.LOG14535.info("Will move " + src + " to " + tgt);
+ }
+ // Don't set inputs and outputs - the locks have already been taken so it's pointless.
+ MoveWork mw = new MoveWork(null, null, null, null, false);
+ mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
+ ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
+ Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf);
+ mv.addDependentTask(ic);
+ return Lists.<Task<?>>newArrayList(mv);
+ }
+
+ private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl,
+ Partition part, EnvironmentContext environmentContext) throws HiveException {
+ if (StatsSetupConst.USER.equals(environmentContext.getProperties()
+ .get(StatsSetupConst.STATS_GENERATED))) {
+ environmentContext.getProperties().remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
+ }
- if(alterTbl.getProps().containsKey(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY)) {
- NanoTimeUtils.validateTimeZone(
- alterTbl.getProps().get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY));
- }
+ List<Task<?>> result = null;
+ if (part != null) {
+ part.getTPartition().getParameters().putAll(alterTbl.getProps());
+ } else {
+ boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters());
+ Boolean isToMmTable = MetaStoreUtils.isToInsertOnlyTable(alterTbl.getProps());
+ if (isToMmTable != null) {
+ if (!isFromMmTable && isToMmTable) {
+ result = generateAddMmTasks(tbl);
+ } else if (isFromMmTable && !isToMmTable) {
+ result = generateRemoveMmTasks(tbl);
+ }
+ }
+ tbl.getTTable().getParameters().putAll(alterTbl.getProps());
+ }
+ return result;
}
private int dropConstraint(Hive db, AlterTableDesc alterTbl)
@@@ -4583,22 -4426,8 +4631,8 @@@
}
}
- // If HIVE_PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE is set to True, then set new Parquet tables timezone
- // to UTC by default (only if the table property is not set)
- if (ParquetHiveSerDe.isParquetTable(tbl)) {
- SessionState ss = SessionState.get();
- String parquetTimezone = tbl.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
- if (parquetTimezone == null || parquetTimezone.isEmpty()) {
- if (ss.getConf().getBoolVar(ConfVars.HIVE_PARQUET_INT96_DEFAULT_UTC_WRITE_ZONE)) {
- tbl.setProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
- }
- } else {
- NanoTimeUtils.validateTimeZone(parquetTimezone);
- }
- }
-
// create the table
- if (crtTbl.getReplaceMode()){
+ if (crtTbl.getReplaceMode()) {
// replace-mode creates are really alters using CreateTableDesc.
try {
db.alterTable(tbl.getDbName()+"."+tbl.getTableName(),tbl,null);
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 0582f94,13750cd..d2d9946
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@@ -382,14 -368,7 +379,11 @@@ public class FetchOperator implements S
Class<? extends InputFormat> formatter = currDesc.getInputFileFormatClass();
Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
- if (ParquetHiveSerDe.class.getName().equals(currDesc.getTableDesc().getSerdeClassName())) {
- ParquetTableUtils.setParquetTimeZoneIfAbsent(job, currDesc.getTableDesc().getProperties());
- }
InputFormat inputFormat = getInputFormatFromCache(formatter, job);
+ String inputs = processCurrPathForMmWriteIds(inputFormat);
+ Utilities.LOG14535.info("Setting fetch inputs to " + inputs);
+ if (inputs == null) return null;
+ job.set("mapred.input.dir", inputs);
InputSplit[] splits = inputFormat.getSplits(job, 1);
FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 7e1b8fa,bc265eb..3544884
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@@ -41,11 -28,11 +41,13 @@@ import org.apache.hadoop.hive.common.St
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+ import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@@ -165,28 -149,11 +167,28 @@@ public class FileSinkOperator extends T
Path[] finalPaths;
RecordWriter[] outWriters;
RecordUpdater[] updaters;
- private Stat stat;
+ Stat stat;
+ int acidLastBucket = -1;
+ int acidFileOffset = -1;
+ private boolean isMmTable;
+ private Long txnId;
+ private int stmtId;
+
+ public FSPaths(Path specPath, boolean isMmTable) {
+ this.isMmTable = isMmTable;
+ if (!isMmTable) {
+ tmpPath = Utilities.toTempPath(specPath);
+ taskOutputTempPath = Utilities.toTaskTempPath(specPath);
+ } else {
+ tmpPath = specPath;
+ taskOutputTempPath = null; // Should not be used.
+ txnId = conf.getTransactionId();
+ stmtId = conf.getStatementId();
+ }
+ Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
+ + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
+ + " (spec path " + specPath + ")"/*, new Exception()*/);
- public FSPaths(Path specPath) {
- tmpPath = Utilities.toTempPath(specPath);
- taskOutputTempPath = Utilities.toTaskTempPath(specPath);
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
@@@ -881,10 -758,10 +885,10 @@@
// for a given operator branch prediction should work quite nicely on it.
// RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we
// pass the row rather than recordValue.
- if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) {
- rowOutWriters[writerOffset].write(recordValue);
+ rowOutWriters[findWriterOffset(row)].write(recordValue);
} else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
- fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
+ fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), row);
} else {
// TODO I suspect we could skip much of the stuff above this in the function in the case
// of update and delete. But I don't understand all of the side effects of the above
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e86ca3a,cde2805..34d0598
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -403,232 -579,6 +436,231 @@@ public class MoveTask extends Task<Move
return (1);
}
}
+
+ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
+ TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
+ List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+ db.validatePartitionNameCharacters(partVals);
+ Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
+ + " into " + tbd.getTable().getTableName());
+ db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(), tbd.getReplace(),
+ tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
+ !tbd.isMmTable(),
+ hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId());
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+
+ // See the comment inside updatePartitionBucketSortColumns.
+ if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
+ updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
+ ti.numBuckets, ti.sortCols);
+ }
+
+ DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
+ // add this partition to post-execution hook
+ if (work.getOutputs() != null) {
+ DDLTask.addIfAbsentByName(new WriteEntity(partn,
+ getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
+ }
+ return dc;
+ }
+
+ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
+ TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+ IOException, InvalidOperationException {
+ DataContainer dc;
+ List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
+
+ console.printInfo(System.getProperty("line.separator"));
+ long startTime = System.currentTimeMillis();
+ // load the list of DP partitions and return the list of partition specs
+ // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
+ // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
+ // After that check the number of DPs created to not exceed the limit and
+ // iterate over it and call loadPartition() here.
+ // The reason we don't do inside HIVE-1361 is the latter is large and we
+ // want to isolate any potential issue it may introduce.
+ if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
+ throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID");
+ }
+ Map<Map<String, String>, Partition> dp =
+ db.loadDynamicPartitions(
+ tbd.getSourcePath(),
+ tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(),
+ tbd.getReplace(),
+ dpCtx.getNumDPCols(),
+ (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
+ !tbd.isMmTable(),
+ SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), hasFollowingStatsTask(),
+ work.getLoadTableWork().getWriteType());
+
+ // publish DP columns to its subscribers
+ if (dps != null && dps.size() > 0) {
+ pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
+ }
+
+ String loadTime = "\t Time taken to load dynamic partitions: " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
+ console.printInfo(loadTime);
+ LOG.info(loadTime);
+
+ if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
+ throw new HiveException("This query creates no partitions." +
+ " To turn off this error, set hive.error.on.empty.partition=false.");
+ }
+
+ startTime = System.currentTimeMillis();
+ // for each partition spec, get the partition
+ // and put it to WriteEntity for post-exec hook
+ for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+ Partition partn = entry.getValue();
+
+ // See the comment inside updatePartitionBucketSortColumns.
+ if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
+ updatePartitionBucketSortColumns(
+ db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
+ }
+
+ WriteEntity enty = new WriteEntity(partn,
+ getWriteType(tbd, work.getLoadTableWork().getWriteType()));
+ if (work.getOutputs() != null) {
+ DDLTask.addIfAbsentByName(enty, work.getOutputs());
+ }
+ // Need to update the queryPlan's output as well so that post-exec hook get executed.
+ // This is only needed for dynamic partitioning since for SP the the WriteEntity is
+ // constructed at compile time and the queryPlan already contains that.
+ // For DP, WriteEntity creation is deferred at this stage so we need to update
+ // queryPlan here.
+ if (queryPlan.getOutputs() == null) {
+ queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
+ }
+ queryPlan.getOutputs().add(enty);
+
+ // update columnar lineage for each partition
+ dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+ // Don't set lineage on delete as we don't have all the columns
+ if (SessionState.get() != null &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
+ SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
+ table.getCols());
+ }
+ LOG.info("\tLoading partition " + entry.getKey());
+ }
+ console.printInfo("\t Time taken for adding to write entity : " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ dc = null; // reset data container to prevent it being added again.
+ return dc;
+ }
+
+ private void inferTaskInformation(TaskInformation ti) {
+ // Find the first ancestor of this MoveTask which is some form of map reduce task
+ // (Either standard, local, or a merge)
+ while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
+ ti.task = (Task)ti.task.getParentTasks().get(0);
+ // If it was a merge task or a local map reduce task, nothing can be inferred
+ if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
+ break;
+ }
+
+ // If it's a standard map reduce task, check what, if anything, it inferred about
+ // the directory this move task is moving
+ if (ti.task instanceof MapRedTask) {
+ MapredWork work = (MapredWork)ti.task.getWork();
+ MapWork mapWork = work.getMapWork();
+ ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
+ ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
+ if (work.getReduceWork() != null) {
+ ti.numBuckets = work.getReduceWork().getNumReduceTasks();
+ }
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ // This must be a final map reduce task (the task containing the file sink
+ // operator that writes the final output)
+ assert work.isFinalMapRed();
+ }
+ break;
+ }
+
+ // If it's a move task, get the path the files were moved from, this is what any
+ // preceding map reduce task inferred information about, and moving does not invalidate
+ // those assumptions
+ // This can happen when a conditional merge is added before the final MoveTask, but the
+ // condition for merging is not met, see GenMRFileSink1.
+ if (ti.task instanceof MoveTask) {
+ MoveTask mt = (MoveTask)ti.task;
+ if (mt.getWork().getLoadFileWork() != null) {
+ ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
+ }
+ }
+ }
+ }
+
+ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
+ throws HiveException {
+ if (work.getCheckFileFormat()) {
+ // Get all files from the src directory
+ FileStatus[] dirs;
+ ArrayList<FileStatus> files;
+ FileSystem srcFs; // source filesystem
+ try {
+ srcFs = tbd.getSourcePath().getFileSystem(conf);
+ dirs = srcFs.globStatus(tbd.getSourcePath());
+ files = new ArrayList<FileStatus>();
+ for (int i = 0; (dirs != null && i < dirs.length); i++) {
+ files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
+ // We only check one file, so exit the loop when we have at least
+ // one.
+ if (files.size() > 0) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(
+ "addFiles: filesystem error in check phase", e);
+ }
+
+ // handle file format check for table level
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
+ boolean flag = true;
+ // work.checkFileFormat is set to true only for Load Task, so assumption here is
+ // dynamic partition context is null
+ if (tbd.getDPCtx() == null) {
+ if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
+ // Check if the file format of the file matches that of the table.
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
+ } else {
+ // Check if the file format of the file matches that of the partition
+ Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
+ if (oldPart == null) {
+ // this means we have just created a table and are specifying partition in the
+ // load statement (without pre-creating the partition), in which case lets use
+ // table input format class. inheritTableSpecs defaults to true so when a new
+ // partition is created later it will automatically inherit input format
+ // from table object
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
+ } else {
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, oldPart.getInputFormatClass(), files);
+ }
+ }
+ if (!flag) {
- throw new HiveException(
- "Wrong file format. Please check the file's format.");
++ throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT);
+ }
+ } else {
+ LOG.warn("Skipping file format check as dpCtx is not null");
+ }
+ }
+ }
+ }
+
+
/**
* so to make sure we crate WriteEntity with the right WriteType. This is (at this point) only
* for consistency since LockManager (which is the only thing that pays attention to WriteType)
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ea0a0fd,aca99f2..88f5a0d
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@@ -1445,14 -1447,21 +1471,21 @@@ public final class Utilities
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
perfLogger.PerfLogBegin("FileSinkOperator", "CreateEmptyBuckets");
- createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
+ createEmptyBuckets(
+ hconf, emptyBuckets, conf.getCompressed(), conf.getTableInfo(), reporter);
+ filesKept.addAll(emptyBuckets);
perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets");
}
-
// move to the file destination
- log.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
+ Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
- Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+ if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR)) {
+ // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks,
+ // by moving just the files we've tracked from removeTempOrDuplicateFiles().
+ Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept);
+ } else {
+ Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
+ }
perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
}
} else {
@@@ -1534,27 -1552,7 +1578,27 @@@
* @return a list of path names corresponding to should-be-created empty buckets.
*/
public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
- DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
+ DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set<Path> filesKept) throws IOException {
+ int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+ numBuckets = (conf != null && conf.getTable() != null)
+ ? conf.getTable().getNumBuckets() : 0;
+ return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0, false);
+ }
+
+ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
+ FileStatus[] items = fs.listStatus(path);
+ // remove empty directory since DP insert should not generate empty partitions.
+ // empty directories could be generated by crashed Task/ScriptOperator
+ if (items.length != 0) return false;
+ if (!fs.delete(path, true)) {
+ LOG.error("Cannot delete empty directory " + path);
+ throw new IOException("Cannot delete empty directory " + path);
+ }
+ return true;
+ }
+
+ public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
+ int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId, boolean isMmTable) throws IOException {
if (fileStats == null) {
return null;
}
@@@ -3145,21 -3150,14 +3235,21 @@@
LOG.info("Processing alias " + alias);
// The alias may not have any path
+ Collection<Map.Entry<Path, ArrayList<String>>> pathToAliases =
+ work.getPathToAliases().entrySet();
+ if (!skipDummy) {
+ // ConcurrentModification otherwise if adding dummy.
+ pathToAliases = new ArrayList<>(pathToAliases);
+ }
boolean isEmptyTable = true;
boolean hasLogged = false;
- // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
- for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
+ Path path = null;
+ for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
- throw new IOException("Operation is Canceled. ");
+ throw new IOException("Operation is Canceled.");
- List<String> aliases = work.getPathToAliases().get(file);
+ Path file = e.getKey();
+ List<String> aliases = e.getValue();
if (aliases.contains(alias)) {
if (file != null) {
isEmptyTable = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 0000000,c944a13..2ae18cf
mode 000000,100644..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@@ -1,0 -1,314 +1,316 @@@
+ /*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.Warehouse;
+ import org.apache.hadoop.hive.metastore.api.Database;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+ import org.apache.hadoop.hive.ql.exec.Task;
+ import org.apache.hadoop.hive.ql.exec.TaskFactory;
+ import org.apache.hadoop.hive.ql.exec.Utilities;
+ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+ import org.apache.hadoop.hive.ql.metadata.HiveException;
+ import org.apache.hadoop.hive.ql.metadata.Partition;
+ import org.apache.hadoop.hive.ql.metadata.Table;
+ import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+ import org.apache.hadoop.hive.ql.parse.SemanticException;
+ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+ import org.apache.hadoop.hive.ql.plan.DDLWork;
+ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+ import org.apache.hadoop.hive.ql.plan.MoveWork;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.mortbay.jetty.servlet.AbstractSessionManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.io.IOException;
+ import java.io.Serializable;
+ import java.util.*;
+
+ import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
+ import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
+ import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString;
+
+ public class LoadPartitions {
+ private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class);
+
+ private final Context context;
+ private final ReplLogger replLogger;
+ private final TableContext tableContext;
+ private final TableEvent event;
+ private final TaskTracker tracker;
+ private final AddPartitionDesc lastReplicatedPartition;
+
+ private final ImportTableDesc tableDesc;
+ private Table table;
+
+ public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
+ TableEvent event, String dbNameToLoadIn,
+ TableContext tableContext) throws HiveException, IOException {
+ this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
+ }
+
+ public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
+ TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
+ AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
+ this.tracker = new TaskTracker(limiter);
+ this.event = event;
+ this.context = context;
+ this.replLogger = replLogger;
+ this.lastReplicatedPartition = lastReplicatedPartition;
+ this.tableContext = tableContext;
+
+ this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn));
+ this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
+ }
+
+ private String location() throws MetaException, HiveException {
+ Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+ if (!tableContext.waitOnPrecursor()) {
+ return context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()).toString();
+ } else {
+ Path tablePath = new Path(
+ context.warehouse.getDefaultDatabasePath(tableDesc.getDatabaseName()),
+ MetaStoreUtils.encodeTableName(tableDesc.getTableName().toLowerCase())
+ );
+ return context.warehouse.getDnsPath(tablePath).toString();
+ }
+ }
+
+ private void createTableReplLogTask() throws SemanticException {
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,
+ tableDesc.getTableName(), tableDesc.tableType());
+ Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf);
+
+ if (tracker.tasks().isEmpty()) {
+ tracker.addTask(replLogTask);
+ } else {
+ ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+
+ List<Task<? extends Serializable>> visited = new ArrayList<>();
+ tracker.updateTaskCount(replLogTask, visited);
+ }
+ }
+
+ public TaskTracker tasks() throws SemanticException {
+ try {
+ /*
+ We are doing this both in load table and load partitions
+ */
+ if (tableDesc.getLocation() == null) {
+ tableDesc.setLocation(location());
+ }
+
+ if (table == null) {
+ //new table
+
+ table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName());
+ if (isPartitioned(tableDesc)) {
+ updateReplicationState(initialReplicationState());
+ if (!forNewTable().hasReplicationState()) {
+ // Add ReplStateLogTask only if no pending table load tasks left for next cycle
+ createTableReplLogTask();
+ }
+ return tracker;
+ }
+ } else {
+ // existing
+
+ if (table.isPartitioned()) {
+ List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
+ if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
+ updateReplicationState(initialReplicationState());
+ if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
+ // Add ReplStateLogTask only if no pending table load tasks left for next cycle
+ createTableReplLogTask();
+ }
+ return tracker;
+ }
+ }
+ }
+ return tracker;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ private void updateReplicationState(ReplicationState replicationState) throws SemanticException {
+ if (!tracker.canAddMoreTasks()) {
+ tracker.setReplicationState(replicationState);
+ }
+ }
+
+ private ReplicationState initialReplicationState() throws SemanticException {
+ return new ReplicationState(
+ new PartitionState(tableDesc.getTableName(), lastReplicatedPartition)
+ );
+ }
+
+ private TaskTracker forNewTable() throws Exception {
+ Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
+ while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+ AddPartitionDesc addPartitionDesc = iterator.next();
+ tracker.addTask(addSinglePartition(table, addPartitionDesc));
+ if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
+ ReplicationState currentReplicationState =
+ new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc));
+ updateReplicationState(currentReplicationState);
+ }
+ }
+ return tracker;
+ }
+
+ /**
+ * returns the root task for adding a partition
+ */
+ private Task<? extends Serializable> addSinglePartition(Table table,
+ AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException {
+ AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
+ Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
+ Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+ partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+ LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+ + partSpec.getLocation());
+ Path tmpPath = context.utils.getExternalTmpPath(replicaWarehousePartitionLocation);
+
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ event.replicationSpec(),
+ sourceWarehousePartitionLocation,
+ tmpPath,
+ context.hiveConf
+ );
+
+ Task<?> addPartTask = TaskFactory.get(
+ new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
+ context.hiveConf
+ );
+
+ Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
+
+ copyTask.addDependentTask(addPartTask);
+ addPartTask.addDependentTask(movePartitionTask);
+ return copyTask;
+ }
+
+ /**
+ * This will create the move of partition data from temp path to actual path
+ */
+ private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec,
+ Path tmpPath) {
+ LoadTableDesc loadTableWork = new LoadTableDesc(
+ tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
- event.replicationSpec().isReplace()
++ event.replicationSpec().isReplace(), SessionState.get().getTxnMgr().getCurrentTxnId()
+ );
+ loadTableWork.setInheritTableSpecs(false);
+ MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+ return TaskFactory.get(work, context.hiveConf);
+ }
+
+ private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)
+ throws MetaException, HiveException, IOException {
+ String child = Warehouse.makePartPath(partSpec.getPartSpec());
+ if (tableDesc.getLocation() == null) {
+ if (table.getDataLocation() == null) {
+ Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+ return new Path(
+ context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()), child);
+ } else {
+ return new Path(table.getDataLocation().toString(), child);
+ }
+ } else {
+ return new Path(tableDesc.getLocation(), child);
+ }
+ }
+
+ private Task<? extends Serializable> alterSinglePartition(AddPartitionDesc desc,
+ ReplicationSpec replicationSpec, Partition ptn) {
+ desc.setReplaceMode(true);
+ if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) {
+ desc.setReplicationSpec(replicationSpec);
+ }
+ desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
+ return TaskFactory.get(
+ new DDLWork(new HashSet<>(), new HashSet<>(), desc),
+ context.hiveConf
+ );
+ }
+
+ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception {
+ boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null);
+ ReplicationSpec replicationSpec = event.replicationSpec();
+ LOG.debug("table partitioned");
+
+ Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
+ while (iterator.hasNext()) {
+ /*
+ encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks
+ for a table, as we have reached the limit of number of tasks we should create for execution.
+ in this case on the next run we have to iterate over the partitions desc to reach the last replicated
+ partition so that we can start replicating partitions after that.
+ */
+ AddPartitionDesc addPartitionDesc = iterator.next();
+ if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) {
+ Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Partition ptn;
+
+ if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == null) {
+ if (!replicationSpec.isMetadataOnly()) {
+ forNewTable();
+ }
+ } else {
+ // If replicating, then the partition already existing means we need to replace, maybe, if
+ // the destination ptn's repl.last.id is older than the replacement's.
+ if (replicationSpec.allowReplacementInto(ptn.getParameters())) {
+ if (replicationSpec.isMetadataOnly()) {
+ tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn));
+ if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
+ tracker.setReplicationState(
+ new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)
+ )
+ );
+ }
+ } else {
+ forNewTable();
+ }
+ } else {
+ // ignore this ptn, do nothing, not an error.
+ }
+ }
+ } else {
+ Map<String, String> currentSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Map<String, String> lastReplicatedPartSpec =
+ lastPartitionReplicated.getPartition(0).getPartSpec();
+ encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec);
+ }
+ }
+ return tracker;
+ }
+ }
+