You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/10/07 22:47:40 UTC
[04/13] hive git commit: HIVE-20306 : Implement projection spec for
fetching only requested fields from partitions (Vihang Karajgaonkar,
reviewed by Aihua Xu, Andrew Sherman and Alexander Kolbasov)
http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
new file mode 100644
index 0000000..3905b9e
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
@@ -0,0 +1,571 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Helper utilities used by DirectSQL code in HiveMetastore.
+ */
+class MetastoreDirectSqlUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(MetastoreDirectSqlUtils.class);
+ private MetastoreDirectSqlUtils() {
+
+ }
+ @SuppressWarnings("unchecked")
+ static <T> T executeWithArray(Query query, Object[] params, String sql) throws MetaException {
+ try {
+ return (T)((params == null) ? query.execute() : query.executeWithArray(params));
+ } catch (Exception ex) {
+ StringBuilder errorBuilder = new StringBuilder("Failed to execute [" + sql + "] with parameters [");
+ if (params != null) {
+ boolean isFirst = true;
+ for (Object param : params) {
+ errorBuilder.append((isFirst ? "" : ", ") + param);
+ isFirst = false;
+ }
+ }
+ LOG.warn(errorBuilder.toString() + "]", ex);
+ // We just logged an exception with (in case of JDO) a humongous callstack. Make a new one.
+ throw new MetaException("See previous errors; " + ex.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ static List<Object[]> ensureList(Object result) throws MetaException {
+ if (!(result instanceof List<?>)) {
+ throw new MetaException("Wrong result type " + result.getClass());
+ }
+ return (List<Object[]>)result;
+ }
+
+ static Long extractSqlLong(Object obj) throws MetaException {
+ if (obj == null) return null;
+ if (!(obj instanceof Number)) {
+ throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+ }
+ return ((Number)obj).longValue();
+ }
+
+ static void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
+ if (!doTrace) return;
+ LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+ (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
+ }
+
+ static <T> int loopJoinOrderedResult(PersistenceManager pm, TreeMap<Long, T> tree,
+ String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
+ return loopJoinOrderedResult(pm, tree, queryText, null, keyIndex, func);
+ }
+ /**
+ * Merges applies the result of a PM SQL query into a tree of object.
+ * Essentially it's an object join. DN could do this for us, but it issues queries
+ * separately for every object, which is suboptimal.
+ * @param pm
+ * @param tree The object tree, by ID.
+ * @param queryText The query text.
+ * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
+ * @param func The function that is called on each (object,row) pair with the same id.
+ * @return the count of results returned from the query.
+ */
+ static <T> int loopJoinOrderedResult(PersistenceManager pm, TreeMap<Long, T> tree,
+ String queryText, Object[] parameters, int keyIndex, ApplyFunc<T> func) throws MetaException {
+ boolean doTrace = LOG.isDebugEnabled();
+ long start = doTrace ? System.nanoTime() : 0;
+ Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ Object result = null;
+ if (parameters == null || parameters.length == 0) {
+ result = query.execute();
+ } else {
+ result = query.executeWithArray(parameters);
+ }
+ long queryTime = doTrace ? System.nanoTime() : 0;
+ if (result == null) {
+ query.closeAll();
+ return 0;
+ }
+ List<Object[]> list = ensureList(result);
+ Iterator<Object[]> iter = list.iterator();
+ Object[] fields = null;
+ for (Map.Entry<Long, T> entry : tree.entrySet()) {
+ if (fields == null && !iter.hasNext()) break;
+ long id = entry.getKey();
+ while (fields != null || iter.hasNext()) {
+ if (fields == null) {
+ fields = iter.next();
+ }
+ long nestedId = extractSqlLong(fields[keyIndex]);
+ if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
+ if (nestedId > id) break; // fields belong to one of the next entries
+ func.apply(entry.getValue(), fields);
+ fields = null;
+ }
+ Deadline.checkTimeout();
+ }
+ int rv = list.size();
+ query.closeAll();
+ timingTrace(doTrace, queryText, start, queryTime);
+ return rv;
+ }
+
+ static void setPartitionParameters(String PARTITION_PARAMS, boolean convertMapNullsToEmptyStrings,
+ PersistenceManager pm, String partIds, TreeMap<Long, Partition> partitions)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + ""
+ + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"PART_ID\" asc";
+ loopJoinOrderedResult(pm, partitions, queryText, 0, new ApplyFunc<Partition>() {
+ @Override
+ public void apply(Partition t, Object[] fields) {
+ t.putToParameters((String)fields[1], (String)fields[2]);
+ }});
+ // Perform conversion of null map values
+ for (Partition t : partitions.values()) {
+ t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+ }
+ }
+
+ static void setPartitionParametersWithFilter(String PARTITION_PARAMS,
+ boolean convertMapNullsToEmptyStrings, PersistenceManager pm, String partIds,
+ TreeMap<Long, Partition> partitions, String includeParamKeyPattern, String excludeParamKeyPattern)
+ throws MetaException {
+ StringBuilder queryTextBuilder = new StringBuilder("select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from ")
+ .append(PARTITION_PARAMS)
+ .append(" where \"PART_ID\" in (")
+ .append(partIds)
+ .append(") and \"PARAM_KEY\" is not null");
+ List<Object> queryParams = new ArrayList<>(2);;
+ if (includeParamKeyPattern != null && !includeParamKeyPattern.isEmpty()) {
+ queryTextBuilder.append(" and \"PARAM_KEY\" LIKE (?)");
+ queryParams.add(includeParamKeyPattern);
+ }
+ if (excludeParamKeyPattern != null && !excludeParamKeyPattern.isEmpty()) {
+ queryTextBuilder.append(" and \"PARAM_KEY\" NOT LIKE (?)");
+ queryParams.add(excludeParamKeyPattern);
+ }
+
+ queryTextBuilder.append(" order by \"PART_ID\" asc");
+ String queryText = queryTextBuilder.toString();
+ loopJoinOrderedResult(pm, partitions, queryText, queryParams.toArray(), 0, new ApplyFunc<Partition>() {
+ @Override
+ public void apply(Partition t, Object[] fields) {
+ t.putToParameters((String) fields[1], (String) fields[2]);
+ }
+ });
+ // Perform conversion of null map values
+ for (Partition t : partitions.values()) {
+ t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+ }
+ }
+
+ static void setPartitionValues(String PARTITION_KEY_VALS, PersistenceManager pm, String partIds,
+ TreeMap<Long, Partition> partitions)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + ""
+ + " where \"PART_ID\" in (" + partIds + ")"
+ + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
+ loopJoinOrderedResult(pm, partitions, queryText, 0, new ApplyFunc<Partition>() {
+ @Override
+ public void apply(Partition t, Object[] fields) {
+ t.addToValues((String)fields[1]);
+ }});
+ }
+
+ static String extractSqlClob(Object value) {
+ if (value == null) return null;
+ try {
+ if (value instanceof Clob) {
+ // we trim the Clob value to a max length an int can hold
+ int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
+ return ((Clob)value).getSubString(1L, maxLength);
+ } else {
+ return value.toString();
+ }
+ } catch (SQLException sqle) {
+ return null;
+ }
+ }
+
+ static void setSDParameters(String SD_PARAMS, boolean convertMapNullsToEmptyStrings,
+ PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + ""
+ + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"SD_ID\" asc";
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) {
+ t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+ }});
+ // Perform conversion of null map values
+ for (StorageDescriptor t : sds.values()) {
+ t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+ }
+ }
+
+ static int extractSqlInt(Object field) {
+ return ((Number)field).intValue();
+ }
+
+ static void setSDSortCols(String SORT_COLS, List<String> columnNames, PersistenceManager pm,
+ TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ StringBuilder queryTextBuilder = new StringBuilder("select \"SD_ID\"");
+ int counter = 0;
+ if (columnNames.contains("col")) {
+ counter++;
+ queryTextBuilder.append(", \"COLUMN_NAME\"");
+ }
+ if (columnNames.contains("order")) {
+ counter++;
+ queryTextBuilder.append(", \"ORDER\"");
+ }
+ queryTextBuilder
+ .append(" from ")
+ .append(SORT_COLS)
+ .append(" where \"SD_ID\" in (")
+ .append(sdIds)
+ .append(") order by \"SD_ID\" asc, \"INTEGER_IDX\" asc");
+ String queryText = queryTextBuilder.toString();
+ final int finalCounter = counter;
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (finalCounter > 1 && fields[2] == null) {
+ return;
+ }
+ Order order = new Order();
+ if (finalCounter > 0) {
+ order.setCol((String) fields[1]);
+ }
+ if (finalCounter > 1) {
+ order.setOrder(extractSqlInt(fields[2]));
+ }
+ t.addToSortCols(order);
+ }});
+ }
+
+ static void setSDSortCols(String SORT_COLS, PersistenceManager pm,
+ TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\""
+ + " from " + SORT_COLS + ""
+ + " where \"SD_ID\" in (" + sdIds + ")"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (fields[2] == null) return;
+ t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
+ }});
+ }
+
+ static void setSDBucketCols(String BUCKETING_COLS, PersistenceManager pm,
+ TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + ""
+ + " where \"SD_ID\" in (" + sdIds + ")"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) {
+ t.addToBucketCols((String)fields[1]);
+ }});
+ }
+
+ static boolean setSkewedColNames(String SKEWED_COL_NAMES, PersistenceManager pm,
+ TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + ""
+ + " where \"SD_ID\" in (" + sdIds + ")"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+ return loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+ t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
+ }}) > 0;
+ }
+
+ static void setSkewedColValues(String SKEWED_STRING_LIST_VALUES, String SKEWED_VALUES,
+ PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds, String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText =
+ "select " + SKEWED_VALUES + ".\"SD_ID_OID\","
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\","
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+ + "from " + SKEWED_VALUES + " "
+ + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "."
+ + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+ + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") "
+ + " and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null "
+ + " and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 "
+ + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc,"
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ private Long currentListId;
+ private List<String> currentList;
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+ if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+ // Note that this is not a typical list accumulator - there's no call to finalize
+ // the last list. Instead we add list to SD first, as well as locally to add elements.
+ if (fields[1] == null) {
+ currentList = null; // left outer join produced a list with no values
+ currentListId = null;
+ t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList());
+ } else {
+ long fieldsListId = extractSqlLong(fields[1]);
+ if (currentListId == null || fieldsListId != currentListId) {
+ currentList = new ArrayList<String>();
+ currentListId = fieldsListId;
+ t.getSkewedInfo().addToSkewedColValues(currentList);
+ }
+ currentList.add((String)fields[2]);
+ }
+ }});
+ }
+
+ static void setSkewedColLocationMaps(String SKEWED_COL_VALUE_LOC_MAP,
+ String SKEWED_STRING_LIST_VALUES, PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds,
+ String sdIds)
+ throws MetaException {
+ String queryText;
+ queryText =
+ "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\","
+ + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID,"
+ + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\","
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+ + "from " + SKEWED_COL_VALUE_LOC_MAP + ""
+ + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "."
+ + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+ + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")"
+ + " and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null "
+ + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc,"
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc,"
+ + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+
+ loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ private Long currentListId;
+ private List<String> currentList;
+ @Override
+ public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+ if (!t.isSetSkewedInfo()) {
+ SkewedInfo skewedInfo = new SkewedInfo();
+ skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+ t.setSkewedInfo(skewedInfo);
+ }
+ Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps();
+ // Note that this is not a typical list accumulator - there's no call to finalize
+ // the last list. Instead we add list to SD first, as well as locally to add elements.
+ if (fields[1] == null) {
+ currentList = new ArrayList<String>(); // left outer join produced a list with no values
+ currentListId = null;
+ } else {
+ long fieldsListId = extractSqlLong(fields[1]);
+ if (currentListId == null || fieldsListId != currentListId) {
+ currentList = new ArrayList<String>();
+ currentListId = fieldsListId;
+ } else {
+ skewMap.remove(currentList); // value based compare.. remove first
+ }
+ currentList.add((String)fields[3]);
+ }
+ skewMap.put(currentList, (String)fields[2]);
+ }});
+ }
+
+ static void setSDCols(String COLUMNS_V2, List<String> columnNames, PersistenceManager pm,
+ TreeMap<Long, List<FieldSchema>> colss, String colIds)
+ throws MetaException {
+ StringBuilder queryTextBuilder = new StringBuilder("select \"CD_ID\"");
+ int counter = 0;
+ if (columnNames.contains("name")) {
+ counter++;
+ queryTextBuilder.append(", \"COLUMN_NAME\"");
+ }
+ if (columnNames.contains("type")) {
+ counter++;
+ queryTextBuilder.append(", \"TYPE_NAME\"");
+ }
+ if (columnNames.contains("comment")) {
+ counter++;
+ queryTextBuilder.append(", \"COMMENT\"");
+ }
+ queryTextBuilder
+ .append(" from ")
+ .append(COLUMNS_V2)
+ .append(" where \"CD_ID\" in (")
+ .append(colIds)
+ .append(") order by \"CD_ID\" asc, \"INTEGER_IDX\" asc");
+ String queryText = queryTextBuilder.toString();
+ int finalCounter = counter;
+ loopJoinOrderedResult(pm, colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+ @Override
+ public void apply(List<FieldSchema> t, Object[] fields) {
+ FieldSchema fieldSchema = new FieldSchema();
+ if (finalCounter > 0) {
+ fieldSchema.setName((String) fields[1]);
+ }
+ if (finalCounter > 1) {
+ fieldSchema.setType(extractSqlClob(fields[2]));
+ }
+ if (finalCounter > 2) {
+ fieldSchema.setComment((String) fields[3]);
+ }
+ t.add(fieldSchema);
+ }});
+ }
+
+ static void setSDCols(String COLUMNS_V2, PersistenceManager pm,
+ TreeMap<Long, List<FieldSchema>> colss, String colIds)
+ throws MetaException {
+ String queryText;
+ queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
+ + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")"
+ + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
+ loopJoinOrderedResult(pm, colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+ @Override
+ public void apply(List<FieldSchema> t, Object[] fields) {
+ t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
+ }});
+ }
+
+ static void setSerdeParams(String SERDE_PARAMS, boolean convertMapNullsToEmptyStrings,
+ PersistenceManager pm, TreeMap<Long, SerDeInfo> serdes, String serdeIds) throws MetaException {
+ String queryText;
+ queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + ""
+ + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"SERDE_ID\" asc";
+ loopJoinOrderedResult(pm, serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
+ @Override
+ public void apply(SerDeInfo t, Object[] fields) {
+ t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+ }});
+ // Perform conversion of null map values
+ for (SerDeInfo t : serdes.values()) {
+ t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+ }
+ }
+
+ /**
+ * Convert a boolean value returned from the RDBMS to a Java Boolean object.
+ * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping.
+ *
+ * @param value
+ * column value from the database
+ * @return The Boolean value of the database column value, null if the column
+ * value is null
+ * @throws MetaException
+ * if the column value cannot be converted into a Boolean object
+ */
+ static Boolean extractSqlBoolean(Object value) throws MetaException {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Boolean) {
+ return (Boolean)value;
+ }
+ if (value instanceof String) {
+ try {
+ return BooleanUtils.toBooleanObject((String) value, "Y", "N", null);
+ } catch (IllegalArgumentException iae) {
+ // NOOP
+ }
+ }
+ throw new MetaException("Cannot extract boolean from column value " + value);
+ }
+
+ static String extractSqlString(Object value) {
+ if (value == null) return null;
+ return value.toString();
+ }
+
+ static Double extractSqlDouble(Object obj) throws MetaException {
+ if (obj == null)
+ return null;
+ if (!(obj instanceof Number)) {
+ throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+ }
+ return ((Number) obj).doubleValue();
+ }
+
+ static byte[] extractSqlBlob(Object value) throws MetaException {
+ if (value == null)
+ return null;
+ if (value instanceof Blob) {
+ //derby, oracle
+ try {
+ // getBytes function says: pos the ordinal position of the first byte in
+ // the BLOB value to be extracted; the first byte is at position 1
+ return ((Blob) value).getBytes(1, (int) ((Blob) value).length());
+ } catch (SQLException e) {
+ throw new MetaException("Encounter error while processing blob.");
+ }
+ }
+ else if (value instanceof byte[]) {
+ // mysql, postgres, sql server
+ return (byte[]) value;
+ }
+ else {
+ // this may happen when enablebitvector is false
+ LOG.debug("Expected blob type but got " + value.getClass().getName());
+ return null;
+ }
+ }
+
+ @FunctionalInterface
+ static interface ApplyFunc<Target> {
+ void apply(Target t, Object[] fields) throws MetaException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 5372714..66977d7 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -69,6 +69,7 @@ import javax.jdo.datastore.JDOConnection;
import javax.jdo.identity.IntIdentity;
import javax.sql.DataSource;
+import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import org.apache.commons.collections.CollectionUtils;
@@ -2031,8 +2032,11 @@ public class ObjectStore implements RawStore, Configurable {
return keys;
}
- private SerDeInfo convertToSerDeInfo(MSerDeInfo ms) throws MetaException {
+ private SerDeInfo convertToSerDeInfo(MSerDeInfo ms, boolean allowNull) throws MetaException {
if (ms == null) {
+ if (allowNull) {
+ return null;
+ }
throw new MetaException("Invalid SerDeInfo object");
}
SerDeInfo serde =
@@ -2086,7 +2090,7 @@ public class ObjectStore implements RawStore, Configurable {
StorageDescriptor sd = new StorageDescriptor(noFS ? null : convertToFieldSchemas(mFieldSchemas),
msd.getLocation(), msd.getInputFormat(), msd.getOutputFormat(), msd
.isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd
- .getSerDeInfo()), convertList(msd.getBucketCols()), convertToOrders(msd
+ .getSerDeInfo(), true), convertList(msd.getBucketCols()), convertToOrders(msd
.getSortCols()), convertMap(msd.getParameters()));
SkewedInfo skewedInfo = new SkewedInfo(convertList(msd.getSkewedColNames()),
convertToSkewedValues(msd.getSkewedColValues()),
@@ -2607,11 +2611,17 @@ public class ObjectStore implements RawStore, Configurable {
if (mpart == null) {
return null;
}
- Partition p = new Partition(convertList(mpart.getValues()), mpart.getTable().getDatabase()
- .getName(), mpart.getTable().getTableName(), mpart.getCreateTime(),
+ //its possible that MPartition is partially filled, do null checks to avoid NPE
+ MTable table = mpart.getTable();
+ String dbName =
+ table == null ? null : table.getDatabase() == null ? null : table.getDatabase().getName();
+ String tableName = table == null ? null : table.getTableName();
+ String catName = table == null ? null :
+ table.getDatabase() == null ? null : table.getDatabase().getCatalogName();
+ Partition p = new Partition(convertList(mpart.getValues()), dbName, tableName, mpart.getCreateTime(),
mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd()),
convertMap(mpart.getParameters()));
- p.setCatName(mpart.getTable().getDatabase().getCatalogName());
+ p.setCatName(catName);
p.setWriteId(mpart.getWriteId());
return p;
}
@@ -3349,6 +3359,64 @@ public class ObjectStore implements RawStore, Configurable {
return mparts;
}
+ // This code is only executed in JDO code path, not from direct SQL code path.
+ private List<MPartition> listMPartitionsWithProjection(String catName, String dbName, String tblName, int max,
+ QueryWrapper queryWrapper, List<String> fieldNames) throws MetaException {
+ boolean success = false;
+ List<MPartition> mparts = null;
+ try {
+ openTransaction();
+ LOG.debug("Executing listMPartitionsWithProjection");
+ dbName = normalizeIdentifier(dbName);
+ tblName = normalizeIdentifier(tblName);
+ Query query = queryWrapper.query = pm.newQuery(MPartition.class,
+ "table.tableName == t1 && table.database.name == t2 && table.database.catalogName == t3");
+ query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3");
+ query.setOrdering("partitionName ascending");
+ if (max >= 0) {
+ query.setRange(0, max);
+ }
+ if (fieldNames == null || fieldNames.isEmpty()) {
+ // full fetch of partitions
+ mparts = (List<MPartition>) query.execute(tblName, dbName, catName);
+ pm.retrieveAll(mparts);
+ } else {
+ // fetch partially filled partitions using result clause
+ query.setResult(Joiner.on(',').join(fieldNames));
+ // if more than one fields are in the result class the return type is List<Object[]>
+ if (fieldNames.size() > 1) {
+ List<Object[]> results = (List<Object[]>) query.execute(tblName, dbName, catName);
+ mparts = new ArrayList<>(results.size());
+ for (Object[] row : results) {
+ MPartition mpart = new MPartition();
+ int i = 0;
+ for (Object val : row) {
+ MetaStoreServerUtils.setNestedProperty(mpart, fieldNames.get(i), val, true);
+ i++;
+ }
+ mparts.add(mpart);
+ }
+ } else {
+ // only one field is requested, return type is List<Object>
+ List<Object> results = (List<Object>) query.execute(tblName, dbName, catName);
+ mparts = new ArrayList<>(results.size());
+ for (Object row : results) {
+ MPartition mpart = new MPartition();
+ MetaStoreServerUtils.setNestedProperty(mpart, fieldNames.get(0), row, true);
+ mparts.add(mpart);
+ }
+ }
+ }
+ success = commitTransaction();
+ LOG.debug("Done retrieving {} objects for listMPartitionsWithProjection", mparts.size());
+ } finally {
+ if (!success) {
+ rollbackTransaction();
+ }
+ }
+ return mparts;
+ }
+
@Override
public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
List<String> partNames) throws MetaException, NoSuchObjectException {
@@ -3492,7 +3560,6 @@ public class ObjectStore implements RawStore, Configurable {
return results;
}
-
private Integer getNumPartitionsViaOrmFilter(Table table, ExpressionTree tree, boolean isValidatedFilter)
throws MetaException {
Map<String, Object> params = new HashMap<>();
@@ -3620,17 +3687,23 @@ public class ObjectStore implements RawStore, Configurable {
private boolean doUseDirectSql;
private long start;
private Table table;
+ protected final List<String> partitionFields;
protected final String catName, dbName, tblName;
private boolean success = false;
protected T results = null;
public GetHelper(String catalogName, String dbName, String tblName,
- boolean allowSql, boolean allowJdo)
- throws MetaException {
+ boolean allowSql, boolean allowJdo) throws MetaException {
+ this(catalogName, dbName, tblName, null, allowSql, allowJdo);
+ }
+
+ public GetHelper(String catalogName, String dbName, String tblName,
+ List<String> fields, boolean allowSql, boolean allowJdo) throws MetaException {
assert allowSql || allowJdo;
this.allowJdo = allowJdo;
this.catName = (catalogName != null) ? normalizeIdentifier(catalogName) : null;
this.dbName = (dbName != null) ? normalizeIdentifier(dbName) : null;
+ this.partitionFields = fields;
if (tblName != null) {
this.tblName = normalizeIdentifier(tblName);
} else {
@@ -3813,7 +3886,12 @@ public class ObjectStore implements RawStore, Configurable {
private abstract class GetListHelper<T> extends GetHelper<List<T>> {
public GetListHelper(String catName, String dbName, String tblName, boolean allowSql,
boolean allowJdo) throws MetaException {
- super(catName, dbName, tblName, allowSql, allowJdo);
+ super(catName, dbName, tblName, null, allowSql, allowJdo);
+ }
+
+ public GetListHelper(String catName, String dbName, String tblName, List<String> fields,
+ boolean allowSql, boolean allowJdo) throws MetaException {
+ super(catName, dbName, tblName, fields, allowSql, allowJdo);
}
@Override
@@ -3961,6 +4039,44 @@ public class ObjectStore implements RawStore, Configurable {
}.run(true);
}
+ @Override
+ public List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName,
+ String tblName, List<String> fieldList,
+ String includeParamKeyPattern,
+ String excludeParamKeyPattern)
+ throws MetaException, NoSuchObjectException {
+ if (fieldList == null || fieldList.isEmpty()) {
+ // no fields are requested. Fallback to regular getPartitions implementation to return all the fields
+ return getPartitionsInternal(catName, dbName, tblName, -1, true, true);
+ }
+
+ return new GetListHelper<Partition>(catName, dbName, tblName,
+ fieldList, true, true) {
+
+ @Override
+ protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx) throws MetaException {
+ return directSql
+ .getPartitionSpecsUsingProjection(ctx.getTable(), ctx.partitionFields, includeParamKeyPattern,
+ excludeParamKeyPattern);
+ }
+
+ @Override
+ protected List<Partition> getJdoResult(
+ GetHelper<List<Partition>> ctx) throws MetaException {
+ // For single-valued fields we can use setResult() to implement projection of fields but
+ // JDO doesn't support multi-valued fields in setResult() so currently JDO implementation
+ // fallbacks to full-partition fetch if the requested fields contain multi-valued fields
+ // TODO: Add param filtering logic
+ List<String> fieldNames = PartitionProjectionEvaluator.getMPartitionFieldNames(ctx.partitionFields);
+ try (QueryWrapper queryWrapper = new QueryWrapper()) {
+ return convertToParts(
+ listMPartitionsWithProjection(catName, dbName, tblName, -1, queryWrapper, fieldNames));
+ }
+ }
+ }.run(true);
+
+ }
+
/**
* Gets the table object for a given table, throws if anything goes wrong.
* @param dbName Database name.
@@ -4403,7 +4519,7 @@ public class ObjectStore implements RawStore, Configurable {
}
oldSd.setBucketCols(newSd.getBucketCols());
- oldSd.setCompressed(newSd.isCompressed());
+ oldSd.setIsCompressed(newSd.isCompressed());
oldSd.setInputFormat(newSd.getInputFormat());
oldSd.setOutputFormat(newSd.getOutputFormat());
oldSd.setNumBuckets(newSd.getNumBuckets());
@@ -11120,7 +11236,7 @@ public class ObjectStore implements RawStore, Configurable {
if (mSerDeInfo == null) {
throw new NoSuchObjectException("No SerDe named " + serDeName);
}
- SerDeInfo serde = convertToSerDeInfo(mSerDeInfo);
+ SerDeInfo serde = convertToSerDeInfo(mSerDeInfo, false);
committed = commitTransaction();
return serde;
} finally {
@@ -11238,7 +11354,7 @@ public class ObjectStore implements RawStore, Configurable {
schemaVersion.setName(mSchemaVersion.getName());
}
if (mSchemaVersion.getSerDe() != null) {
- schemaVersion.setSerDe(convertToSerDeInfo(mSchemaVersion.getSerDe()));
+ schemaVersion.setSerDe(convertToSerDeInfo(mSchemaVersion.getSerDe(), false));
}
return schemaVersion;
}