You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/09 20:11:21 UTC

hive git commit: HIVE-19794 : Disable removing order by from subquery in GenericUDTFGetSplits (Prasanth J via Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 2493e4a41 -> 95ea9f535


HIVE-19794 : Disable removing order by from subquery in GenericUDTFGetSplits (Prasanth J via Jason Dere)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95ea9f53
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95ea9f53
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95ea9f53

Branch: refs/heads/master
Commit: 95ea9f535f0439391b7b8ddc411f5f097d6dc818
Parents: 2493e4a
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sat Jun 9 13:10:48 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Jun 9 13:10:48 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../hive/jdbc/TestJdbcGenericUDTFGetSplits.java | 187 +++++++++++++++++++
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  30 ++-
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  31 +--
 4 files changed, 235 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b5e2d86..87db42a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4172,6 +4172,11 @@ public class HiveConf extends Configuration {
     LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT("hive.llap.external.splits.temp.table.storage.format",
         "orc", new StringSet("default", "text", "orc"),
         "Storage format for temp tables created using LLAP external client"),
+    LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT("hive.llap.external.splits.order.by.force.single.split",
+      true,
+      "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" +
+        "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" +
+        "losing the ordering (external clients are responsible for guaranteeing the ordering)"),
     LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
         "Override if grace join should be allowed to run in llap."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
new file mode 100644
index 0000000..c8a428c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+import org.apache.hadoop.hive.ql.wm.Action;
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+import org.apache.hadoop.hive.ql.wm.Expression;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
+import org.apache.hadoop.hive.ql.wm.Trigger;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestJdbcGenericUDTFGetSplits {
+  protected static MiniHS2 miniHS2 = null;
+  protected static String dataFileDir;
+  static Path kvDataFilePath;
+  protected static String tableName = "testtab1";
+
+  protected static HiveConf conf = null;
+  protected Connection hs2Conn = null;
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+
+    String confDir = "../../data/conf/llap/";
+    HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml"));
+    System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+
+    conf = new HiveConf();
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
+    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS);
+    conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true);
+    conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
+    conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+    conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
+    conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text");
+
+
+    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+      + "/tez-site.xml"));
+
+    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+
+    Map<String, String> confOverlay = new HashMap<>();
+    miniHS2.start(confOverlay);
+    miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LlapBaseInputFormat.closeAll();
+    hs2Conn.close();
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+  }
+
+  @Test(timeout = 200000)
+  public void testGenericUDTFOrderBySplitCount1() throws Exception {
+    String query = "select get_splits(" + "'select value from " + tableName + "', 5)";
+    runQuery(query, getConfigs(), 10);
+
+    query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)";
+    runQuery(query, getConfigs(), 1);
+
+    query = "select get_splits(" +
+      "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)";
+    runQuery(query, getConfigs(), 1);
+
+    List<String> setCmds = getConfigs();
+    setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false");
+    query = "select get_splits(" +
+      "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)";
+    runQuery(query, setCmds, 10);
+  }
+
+  private void runQuery(final String query, final List<String> setCmds,
+    final int numRows) throws Exception {
+
+    Connection con = hs2Conn;
+    BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setErr(new PrintStream(baos)); // capture stderr
+    final Statement selStmt = con.createStatement();
+    Throwable throwable = null;
+    int rowCount = 0;
+    try {
+      try {
+        if (setCmds != null) {
+          for (String setCmd : setCmds) {
+            selStmt.execute(setCmd);
+          }
+        }
+        ResultSet resultSet = selStmt.executeQuery(query);
+        while(resultSet.next()) {
+          rowCount++;
+        }
+      } catch (SQLException e) {
+        throwable = e;
+      }
+      selStmt.close();
+      assertNull(throwable);
+      System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount);
+      assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount);
+    } finally {
+      baos.close();
+    }
+
+  }
+
+  List<String> getConfigs(String... more) {
+    List<String> setCmds = new ArrayList<>();
+    setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
+    setCmds.add("set mapred.min.split.size=10");
+    setCmds.add("set mapred.max.split.size=10");
+    setCmds.add("set tez.grouping.min-size=10");
+    setCmds.add("set tez.grouping.max-size=10");
+    // to get at least 10 splits
+    setCmds.add("set tez.grouping.split-waves=10");
+    if (more != null) {
+      setCmds.addAll(Arrays.asList(more));
+    }
+    return setCmds;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 57f6c66..5dc4a1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -29,6 +29,9 @@ import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.tez.common.counters.TezCounters;
@@ -86,13 +89,15 @@ public class HiveSplitGenerator extends InputInitializer {
   private final MapWork work;
   private final SplitGrouper splitGrouper = new SplitGrouper();
   private final SplitLocationProvider splitLocationProvider;
+  private boolean generateSingleSplit;
 
-  public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException {
+  public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit) throws IOException {
     super(null);
 
     this.conf = conf;
     this.work = work;
     this.jobConf = new JobConf(conf);
+    this.generateSingleSplit = generateSingleSplit;
 
     // Assuming grouping enabled always.
     userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
@@ -199,8 +204,27 @@ public class HiveSplitGenerator extends InputInitializer {
           conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
             TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
-        // Raw splits
-        InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+        InputSplit[] splits;
+        if (generateSingleSplit) {
+          splits = new InputSplit[1];
+          List<Path> paths = Utilities.getInputPathsTez(jobConf, Utilities.getMapWork(jobConf));
+          FileSystem fs = paths.get(0).getFileSystem(jobConf);
+          FileStatus[] fileStatuses = fs.listStatus(paths.get(0));
+          FileStatus fileStatus = fileStatuses[0];
+          Preconditions.checkState(paths.size() == 1 && fileStatuses.length == 1, "Requested to generate single " +
+            "split. Paths and fileStatuses are expected to be 1. Got paths: " + paths.size() + " fileStatuses: " +
+            fileStatuses.length);
+          BlockLocation[] locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+          Set<String> hostsSet = new HashSet<>();
+          for (BlockLocation location : locations) {
+            hostsSet.addAll(Lists.newArrayList(location.getHosts()));
+          }
+          String[] hosts = hostsSet.toArray(new String[0]);
+          splits[0] = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts);
+        } else {
+          // Raw splits
+          splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+        }
         // Sort the splits, so that subsequent grouping is consistent.
         Arrays.sort(splits, new InputSplitComparator());
         LOG.info("Number of input splits: " + splits.length + ". " + availableSlots

http://git-wip-us.apache.org/repos/asf/hive/blob/95ea9f53/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 25a0ef2..a29b560 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -88,6 +87,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SplitLocationInfo;
@@ -130,6 +130,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   protected transient IntObjectInspector intOI;
   protected transient JobConf jc;
   private boolean orderByQuery;
+  private boolean forceSingleSplit;
   private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
   private DataOutput dos = new DataOutputStream(bos);
 
@@ -204,14 +205,12 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     TezWork tezWork = fragment.work;
     Schema schema = fragment.schema;
 
-    if (orderByQuery) {
-      jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, false);
-      jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, true);
-      jc.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, 1);
-    }
+    boolean generateSingleSplit = forceSingleSplit && orderByQuery;
     try {
-      InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId);
-      if (orderByQuery && splits.length > 1) {
+      InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit);
+      LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query,
+        orderByQuery, forceSingleSplit);
+      if (generateSingleSplit && splits.length > 1) {
         throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query);
       }
       for (InputSplit s : splits) {
@@ -243,6 +242,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     // Tez/LLAP requires RPC query plan
     HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
     HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false);
+    // spark-llap always wraps query under a subquery, until that is removed from spark-llap
+    // hive compiler is going to remove inner order by. disable that optimization until then.
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false);
 
     try {
       jc = DagUtils.getInstance().createConfiguration(conf);
@@ -267,9 +269,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       }
 
       QueryPlan plan = driver.getPlan();
-      if (plan.getQueryProperties().hasOuterOrderBy()) {
-        orderByQuery = true;
-      }
+      orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy();
+      forceSingleSplit = orderByQuery &&
+        HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT);
       List<Task<?>> roots = plan.getRootTasks();
       Schema schema = convertSchema(plan.getResultSchema());
       if(num == 0) {
@@ -365,7 +367,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
-  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId)
+  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId,
+    final boolean generateSingleSplit)
     throws IOException {
 
     if(numSplits == 0) {
@@ -413,10 +416,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
               ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
 
-
-      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork);
+      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit);
       List<Event> eventList = splitGenerator.initialize();
-
       InputSplit[] result = new InputSplit[eventList.size() - 1];
 
       InputConfigureVertexTasksEvent configureEvent