You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/09 20:11:21 UTC
hive git commit: HIVE-19794 : Disable removing order by from subquery
in GenericUDTFGetSplits (Prasanth J via Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 2493e4a41 -> 95ea9f535
HIVE-19794 : Disable removing order by from subquery in GenericUDTFGetSplits (Prasanth J via Jason Dere)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95ea9f53
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95ea9f53
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95ea9f53
Branch: refs/heads/master
Commit: 95ea9f535f0439391b7b8ddc411f5f097d6dc818
Parents: 2493e4a
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sat Jun 9 13:10:48 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Jun 9 13:10:48 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../hive/jdbc/TestJdbcGenericUDTFGetSplits.java | 187 +++++++++++++++++++
.../hive/ql/exec/tez/HiveSplitGenerator.java | 30 ++-
.../ql/udf/generic/GenericUDTFGetSplits.java | 31 +--
4 files changed, 235 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b5e2d86..87db42a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4172,6 +4172,11 @@ public class HiveConf extends Configuration {
LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT("hive.llap.external.splits.temp.table.storage.format",
"orc", new StringSet("default", "text", "orc"),
"Storage format for temp tables created using LLAP external client"),
+ LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT("hive.llap.external.splits.order.by.force.single.split",
+ true,
+ "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" +
+ "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" +
+ "losing the ordering (external clients are responsible for guaranteeing the ordering)"),
LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
"Override if grace join should be allowed to run in llap."),
http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
new file mode 100644
index 0000000..c8a428c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+import org.apache.hadoop.hive.ql.wm.Action;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.Expression;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestJdbcGenericUDTFGetSplits {
+ protected static MiniHS2 miniHS2 = null;
+ protected static String dataFileDir;
+ static Path kvDataFilePath;
+ protected static String tableName = "testtab1";
+
+ protected static HiveConf conf = null;
+ protected Connection hs2Conn = null;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+
+ String confDir = "../../data/conf/llap/";
+ HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+
+ conf = new HiveConf();
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
+ conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true);
+ conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
+ conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+ conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
+ conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text");
+
+
+ conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ + "/tez-site.xml"));
+
+ miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+ dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+ Map<String, String> confOverlay = new HashMap<>();
+ miniHS2.start(confOverlay);
+ miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LlapBaseInputFormat.closeAll();
+ hs2Conn.close();
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ @Test(timeout = 200000)
+ public void testGenericUDTFOrderBySplitCount1() throws Exception {
+ String query = "select get_splits(" + "'select value from " + tableName + "', 5)";
+ runQuery(query, getConfigs(), 10);
+
+ query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)";
+ runQuery(query, getConfigs(), 1);
+
+ query = "select get_splits(" +
+ "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)";
+ runQuery(query, getConfigs(), 1);
+
+ List<String> setCmds = getConfigs();
+ setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false");
+ query = "select get_splits(" +
+ "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)";
+ runQuery(query, setCmds, 10);
+ }
+
+ private void runQuery(final String query, final List<String> setCmds,
+ final int numRows) throws Exception {
+
+ Connection con = hs2Conn;
+ BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(baos)); // capture stderr
+ final Statement selStmt = con.createStatement();
+ Throwable throwable = null;
+ int rowCount = 0;
+ try {
+ try {
+ if (setCmds != null) {
+ for (String setCmd : setCmds) {
+ selStmt.execute(setCmd);
+ }
+ }
+ ResultSet resultSet = selStmt.executeQuery(query);
+ while(resultSet.next()) {
+ rowCount++;
+ }
+ } catch (SQLException e) {
+ throwable = e;
+ }
+ selStmt.close();
+ assertNull(throwable);
+ System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount);
+ assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount);
+ } finally {
+ baos.close();
+ }
+
+ }
+
+ List<String> getConfigs(String... more) {
+ List<String> setCmds = new ArrayList<>();
+ setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
+ setCmds.add("set mapred.min.split.size=10");
+ setCmds.add("set mapred.max.split.size=10");
+ setCmds.add("set tez.grouping.min-size=10");
+ setCmds.add("set tez.grouping.max-size=10");
+ // to get at least 10 splits
+ setCmds.add("set tez.grouping.split-waves=10");
+ if (more != null) {
+ setCmds.addAll(Arrays.asList(more));
+ }
+ return setCmds;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 57f6c66..5dc4a1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -29,6 +29,9 @@ import java.util.Set;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.tez.common.counters.TezCounters;
@@ -86,13 +89,15 @@ public class HiveSplitGenerator extends InputInitializer {
private final MapWork work;
private final SplitGrouper splitGrouper = new SplitGrouper();
private final SplitLocationProvider splitLocationProvider;
+ private boolean generateSingleSplit;
- public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException {
+ public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit) throws IOException {
super(null);
this.conf = conf;
this.work = work;
this.jobConf = new JobConf(conf);
+ this.generateSingleSplit = generateSingleSplit;
// Assuming grouping enabled always.
userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
@@ -199,8 +204,27 @@ public class HiveSplitGenerator extends InputInitializer {
conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
- // Raw splits
- InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+ InputSplit[] splits;
+ if (generateSingleSplit) {
+ splits = new InputSplit[1];
+ List<Path> paths = Utilities.getInputPathsTez(jobConf, Utilities.getMapWork(jobConf));
+ FileSystem fs = paths.get(0).getFileSystem(jobConf);
+ FileStatus[] fileStatuses = fs.listStatus(paths.get(0));
+ FileStatus fileStatus = fileStatuses[0];
+ Preconditions.checkState(paths.size() == 1 && fileStatuses.length == 1, "Requested to generate single " +
+ "split. Paths and fileStatuses are expected to be 1. Got paths: " + paths.size() + " fileStatuses: " +
+ fileStatuses.length);
+ BlockLocation[] locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ Set<String> hostsSet = new HashSet<>();
+ for (BlockLocation location : locations) {
+ hostsSet.addAll(Lists.newArrayList(location.getHosts()));
+ }
+ String[] hosts = hostsSet.toArray(new String[0]);
+ splits[0] = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts);
+ } else {
+ // Raw splits
+ splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+ }
// Sort the splits, so that subsequent grouping is consistent.
Arrays.sort(splits, new InputSplitComparator());
LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 25a0ef2..a29b560 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -88,6 +87,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
@@ -130,6 +130,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
protected transient IntObjectInspector intOI;
protected transient JobConf jc;
private boolean orderByQuery;
+ private boolean forceSingleSplit;
private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
private DataOutput dos = new DataOutputStream(bos);
@@ -204,14 +205,12 @@ public class GenericUDTFGetSplits extends GenericUDTF {
TezWork tezWork = fragment.work;
Schema schema = fragment.schema;
- if (orderByQuery) {
- jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, false);
- jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, true);
- jc.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, 1);
- }
+ boolean generateSingleSplit = forceSingleSplit && orderByQuery;
try {
- InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId);
- if (orderByQuery && splits.length > 1) {
+ InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit);
+ LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query,
+ orderByQuery, forceSingleSplit);
+ if (generateSingleSplit && splits.length > 1) {
throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query);
}
for (InputSplit s : splits) {
@@ -243,6 +242,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Tez/LLAP requires RPC query plan
HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false);
+ // spark-llap always wraps query under a subquery, until that is removed from spark-llap
+ // hive compiler is going to remove inner order by. disable that optimization until then.
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false);
try {
jc = DagUtils.getInstance().createConfiguration(conf);
@@ -267,9 +269,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
}
QueryPlan plan = driver.getPlan();
- if (plan.getQueryProperties().hasOuterOrderBy()) {
- orderByQuery = true;
- }
+ orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy();
+ forceSingleSplit = orderByQuery &&
+ HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT);
List<Task<?>> roots = plan.getRootTasks();
Schema schema = convertSchema(plan.getResultSchema());
if(num == 0) {
@@ -365,7 +367,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
}
}
- public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId)
+ public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId,
+ final boolean generateSingleSplit)
throws IOException {
if(numSplits == 0) {
@@ -413,10 +416,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
Preconditions.checkState(HiveConf.getBoolVar(wxConf,
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
-
- HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork);
+ HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit);
List<Event> eventList = splitGenerator.initialize();
-
InputSplit[] result = new InputSplit[eventList.size() - 1];
InputConfigureVertexTasksEvent configureEvent