You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/01/20 22:17:42 UTC

[1/2] hive git commit: HIVE-15554 : Add task information to LLAP AM heartbeat (Sergey Shelukhin, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master 811b3e39e -> ab7f6f31f


HIVE-15554 : Add task information to LLAP AM heartbeat (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cde56db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cde56db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cde56db

Branch: refs/heads/master
Commit: 9cde56dbe9498c9a1114581d14855a305b95026f
Parents: 811b3e3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 20 14:11:49 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 20 14:17:32 2017 -0800

----------------------------------------------------------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    |  42 ++++++--
 .../protocol/LlapTaskUmbilicalProtocol.java     |  12 ++-
 .../hive/llap/daemon/impl/AMReporter.java       | 105 +++++++++++++------
 .../llap/daemon/impl/TaskRunnerCallable.java    |  17 +--
 .../llap/tezplugins/LlapTaskCommunicator.java   |  25 ++++-
 5 files changed, 148 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 3e0232d..7d0d6d2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -16,6 +16,14 @@
  */
 package org.apache.hadoop.hive.llap.ext;
 
+import org.apache.hadoop.io.Writable;
+
+import java.util.HashSet;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
+import org.apache.hadoop.io.ArrayWritable;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -222,9 +230,15 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
   }
 
-  private void updateHeartbeatInfo(String hostname, String uniqueId, int port) {
+  private void updateHeartbeatInfo(
+      String hostname, String uniqueId, int port, TezAttemptArray tasks) {
     int updateCount = 0;
+    HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+    for (Writable w : tasks.get()) {
+      attempts.add((TezTaskAttemptID)w);
+    }
 
+    String error = "";
     for (String key : pendingEvents.keySet()) {
       PendingEventData pendingEventData = pendingEvents.get(key);
       if (pendingEventData != null) {
@@ -232,8 +246,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         String thiUniqueId = thi.uniqueNodeId;
         if (thi.hostname.equals(hostname) && thi.port == port
             && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
-          thi.lastHeartbeat.set(System.currentTimeMillis());
-          updateCount++;
+          TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
+          if (attempts.contains(ta)) {
+            thi.lastHeartbeat.set(System.currentTimeMillis());
+            updateCount++;
+          } else {
+            error += (thi.taskAttemptId + ", ");
+          }
         }
       }
     }
@@ -244,11 +263,19 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         String thiUniqueId = thi.uniqueNodeId;
         if (thi.hostname.equals(hostname) && thi.port == port
             && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
-          thi.lastHeartbeat.set(System.currentTimeMillis());
-          updateCount++;
+          TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
+          if (attempts.contains(ta)) {
+            thi.lastHeartbeat.set(System.currentTimeMillis());
+            updateCount++;
+          } else {
+            error += (thi.taskAttemptId + ", ");
+          }
         }
       }
     }
+    if (!error.isEmpty()) {
+      LOG.info("The tasks we expected to be on the node are not there: " + error);
+    }
 
     if (updateCount == 0) {
       LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
@@ -395,8 +422,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
 
     @Override
-    public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
-      updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port);
+    public void nodeHeartbeat(
+        Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+      updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
       // No need to propagate to this to the responder
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index dbfe54b..a2dca1b 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -14,6 +14,8 @@
 
 package org.apache.hadoop.hive.llap.protocol;
 
+import org.apache.hadoop.io.ArrayWritable;
+
 import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
@@ -28,6 +30,13 @@ import org.apache.tez.runtime.common.security.JobTokenSelector;
 @TokenInfo(JobTokenSelector.class)
 public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
 
+  // Why are we still using writables in 2017?
+  public class TezAttemptArray extends ArrayWritable {
+    public TezAttemptArray() {
+      super(TezTaskAttemptID.class);
+    }
+  }
+
   public static final long versionID = 1L;
 
   // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
@@ -35,7 +44,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException;
+  public void nodeHeartbeat(
+      Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException;
 
   public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 93f8073..027d8eb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,6 +14,20 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+import java.util.ArrayList;
+
+import java.util.List;
+
+import java.util.HashSet;
+
+import java.util.Set;
+
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.net.SocketFactory;
 
 import java.io.IOException;
@@ -175,7 +189,8 @@ public class AMReporter extends AbstractService {
   }
 
   public void registerTask(String amLocation, int port, String user,
-                           Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) {
+      Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
+      TezTaskAttemptID attemptId) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier);
     }
@@ -184,9 +199,8 @@ public class AMReporter extends AbstractService {
       LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
       amNodeInfo = knownAppMasters.get(amNodeId);
       if (amNodeInfo == null) {
-        amNodeInfo =
-            new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
-                conf);
+        amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier,
+            retryPolicy, retryTimeout, socketFactory, conf);
         knownAppMasters.put(amNodeId, amNodeInfo);
         // Add to the queue only the first time this is registered, and on
         // subsequent instances when it's taken off the queue.
@@ -194,11 +208,11 @@ public class AMReporter extends AbstractService {
         pendingHeartbeatQueeu.add(amNodeInfo);
       }
       amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
-      amNodeInfo.incrementAndGetTaskCount();
+      amNodeInfo.addTaskAttempt(attemptId);
     }
   }
 
-  public void unregisterTask(String amLocation, int port) {
+  public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port);
     }
@@ -209,7 +223,7 @@ public class AMReporter extends AbstractService {
       if (amNodeInfo == null) {
         LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port));
       } else {
-        amNodeInfo.decrementAndGetTaskCount();
+        amNodeInfo.removeTaskAttempt(ta);
       }
       // Not removing this here. Will be removed when taken off the queue and discovered to have 0
       // pending tasks.
@@ -334,29 +348,33 @@ public class AMReporter extends AbstractService {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo);
       }
-      if (amNodeInfo.getTaskCount() > 0) {
-        try {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("NodeHeartbeat to: " + amNodeInfo);
-          }
-          amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
-              new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort());
-        } catch (IOException e) {
-          QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
-          amNodeInfo.setAmFailed(true);
-          LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
-              amNodeInfo.amNodeId, currentQueryIdentifier, e);
-          queryFailedHandler.queryFailed(currentQueryIdentifier);
-        } catch (InterruptedException e) {
-          if (!isShutdown.get()) {
-            LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
-          }
-        }
-      } else {
+      List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot();
+      if (tasks.isEmpty()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0");
         }
+        return null;
       }
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("NodeHeartbeat to: " + amNodeInfo);
+        }
+        TezAttemptArray aw = new TezAttemptArray();
+        aw.set(tasks.toArray(new TezTaskAttemptID[tasks.size()]));
+        amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
+            new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw);
+      } catch (IOException e) {
+        QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+        amNodeInfo.setAmFailed(true);
+        LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
+            amNodeInfo.amNodeId, currentQueryIdentifier, e);
+        queryFailedHandler.queryFailed(currentQueryIdentifier);
+      } catch (InterruptedException e) {
+        if (!isShutdown.get()) {
+          LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
+        }
+      }
+
       return null;
     }
   }
@@ -364,7 +382,8 @@ public class AMReporter extends AbstractService {
 
 
   private static class AMNodeInfo implements Delayed {
-    private final AtomicInteger taskCount = new AtomicInteger(0);
+    // Serves as lock for itself.
+    private final Set<TezTaskAttemptID> tasks = new HashSet<>();
     private final String user;
     private final Token<JobTokenIdentifier> jobToken;
     private final Configuration conf;
@@ -422,12 +441,22 @@ public class AMReporter extends AbstractService {
       umbilical = null;
     }
 
-    int incrementAndGetTaskCount() {
-      return taskCount.incrementAndGet();
+    int addTaskAttempt(TezTaskAttemptID attemptId) {
+      synchronized (tasks) {
+        if (!tasks.add(attemptId)) {
+          throw new RuntimeException(attemptId + " was already registered");
+        }
+        return tasks.size();
+      }
     }
 
-    int decrementAndGetTaskCount() {
-      return taskCount.decrementAndGet();
+    int removeTaskAttempt(TezTaskAttemptID attemptId) {
+      synchronized (tasks) {
+        if (!tasks.remove(attemptId)) {
+          throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
+        }
+        return tasks.size();
+      }
     }
 
     void setAmFailed(boolean val) {
@@ -438,8 +467,12 @@ public class AMReporter extends AbstractService {
       return amFailed.get();
     }
 
-    int getTaskCount() {
-      return taskCount.get();
+    List<TezTaskAttemptID> getTasksSnapshot() {
+      List<TezTaskAttemptID> result = new ArrayList<>();
+      synchronized (tasks) {
+        result.addAll(tasks);
+      }
+      return result;
     }
 
     public synchronized QueryIdentifier getCurrentQueryIdentifier() {
@@ -475,5 +508,11 @@ public class AMReporter extends AbstractService {
     public String toString() {
       return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount();
     }
+
+    private int getTaskCount() {
+      synchronized (tasks) {
+        return tasks.size();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d8689ba..bfb155a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -141,7 +141,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     // Register with the AMReporter when the callable is setup. Unregister once it starts running.
     if (amReporter != null && jobToken != null) {
       this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
-          vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+          vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId);
     }
     this.metrics = metrics;
     this.requestId = taskSpec.getTaskAttemptID().toString();
@@ -172,11 +172,12 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       }
 
       // Unregister from the AMReporter, since the task is now running.
-      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+      TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
+      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta);
 
       synchronized (this) {
         if (!shouldRunTask) {
-          LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+          LOG.info("Not starting task {} since it was killed earlier", ta);
           return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
         }
       }
@@ -290,19 +291,19 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     if (!isCompleted.get()) {
       if (!killInvoked.getAndSet(true)) {
         synchronized (this) {
-          LOG.info("Kill task requested for id={}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(),
-              (taskRunner != null));
+          TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
+          LOG.info("Kill task requested for id={}, taskRunnerSetup={}", ta, taskRunner != null);
           if (taskRunner != null) {
             killtimerWatch.start();
             LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
             boolean killed = taskRunner.killTask();
             if (killed) {
               // Sending a kill message to the AM right here. Don't need to wait for the task to complete.
-              LOG.info("Kill request for task {} completed. Informing AM", taskSpec.getTaskAttemptID());
+              LOG.info("Kill request for task {} completed. Informing AM", ta);
               reportTaskKilled();
             } else {
               LOG.info("Kill request for task {} did not complete because the task is already complete",
-                  taskSpec.getTaskAttemptID());
+                  ta);
             }
             shouldRunTask = false;
           } else {
@@ -314,7 +315,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
             // If the task hasn't started - inform about fragment completion immediately. It's possible for
             // the callable to never run.
             fragmentCompletionHanler.fragmentComplete(fragmentInfo);
-            this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+            this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 1de4bfe..c7f4ce8 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -14,6 +14,10 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
@@ -565,13 +569,18 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
   private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
 
-  void nodePinged(String hostname, String uniqueId, int port) {
+  void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks) {
     // TODO: do we ever need the port? we could just do away with nodeId altogether.
     LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
     registerPingingNode(nodeId);
     BiMap<ContainerId, TezTaskAttemptID> biMap =
         entityTracker.getContainerAttemptMapForNode(nodeId);
     if (biMap != null) {
+      HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+      for (Writable w : tasks.get()) {
+        attempts.add((TezTaskAttemptID)w);
+      }
+      String error = "";
       synchronized (biMap) {
         for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
           // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
@@ -584,11 +593,18 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
           // However, the next heartbeat(s) should get the value eventually and mark task as alive.
           // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
           if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
-            getContext().taskAlive(entry.getValue());
+            if (attempts.contains(attemptId)) {
+              getContext().taskAlive(entry.getValue());
+            } else {
+              error += (attemptId + ", ");
+            }
             getContext().containerAlive(entry.getKey());
           }
         }
       }
+      if (!error.isEmpty()) {
+        LOG.info("The tasks we expected to be on the node are not there: " + error);
+      }
     } else {
       long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
       if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
@@ -679,8 +695,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
 
     @Override
-    public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
-      nodePinged(hostname.toString(), uniqueId.toString(), port);
+    public void nodeHeartbeat(
+        Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+      nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
       }


[2/2] hive git commit: HIVE-15390 : Orc reader unnecessarily reading stripe footers with hive.optimize.index.filter set to true (Abhishek Somani, reviewed by Sergey Shelukhin and Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-15390 : Orc reader unnecessarily reading stripe footers with hive.optimize.index.filter set to true (Abhishek Somani, reviewed by Sergey Shelukhin and Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab7f6f31
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab7f6f31
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab7f6f31

Branch: refs/heads/master
Commit: ab7f6f31fb85e49fd41ad1400f6e3aaa59fc95a8
Parents: 9cde56d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 20 14:17:13 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 20 14:17:33 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java   | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ab7f6f31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index fb7a6b2..dd53afa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -455,7 +455,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       ReaderKey key = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
-        options.range(options.getOffset(), Long.MAX_VALUE);
         pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
                                       options);
       } else {