You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/01/20 22:17:42 UTC
[1/2] hive git commit: HIVE-15554 : Add task information to LLAP AM
heartbeat (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master 811b3e39e -> ab7f6f31f
HIVE-15554 : Add task information to LLAP AM heartbeat (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cde56db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cde56db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cde56db
Branch: refs/heads/master
Commit: 9cde56dbe9498c9a1114581d14855a305b95026f
Parents: 811b3e3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 20 14:11:49 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 20 14:17:32 2017 -0800
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 42 ++++++--
.../protocol/LlapTaskUmbilicalProtocol.java | 12 ++-
.../hive/llap/daemon/impl/AMReporter.java | 105 +++++++++++++------
.../llap/daemon/impl/TaskRunnerCallable.java | 17 +--
.../llap/tezplugins/LlapTaskCommunicator.java | 25 ++++-
5 files changed, 148 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 3e0232d..7d0d6d2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -16,6 +16,14 @@
*/
package org.apache.hadoop.hive.llap.ext;
+import org.apache.hadoop.io.Writable;
+
+import java.util.HashSet;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
+import org.apache.hadoop.io.ArrayWritable;
+
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -222,9 +230,15 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
}
- private void updateHeartbeatInfo(String hostname, String uniqueId, int port) {
+ private void updateHeartbeatInfo(
+ String hostname, String uniqueId, int port, TezAttemptArray tasks) {
int updateCount = 0;
+ HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+ for (Writable w : tasks.get()) {
+ attempts.add((TezTaskAttemptID)w);
+ }
+ String error = "";
for (String key : pendingEvents.keySet()) {
PendingEventData pendingEventData = pendingEvents.get(key);
if (pendingEventData != null) {
@@ -232,8 +246,13 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
String thiUniqueId = thi.uniqueNodeId;
if (thi.hostname.equals(hostname) && thi.port == port
&& (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
- thi.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
+ TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
+ if (attempts.contains(ta)) {
+ thi.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ } else {
+ error += (thi.taskAttemptId + ", ");
+ }
}
}
}
@@ -244,11 +263,19 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
String thiUniqueId = thi.uniqueNodeId;
if (thi.hostname.equals(hostname) && thi.port == port
&& (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
- thi.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
+ TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId);
+ if (attempts.contains(ta)) {
+ thi.lastHeartbeat.set(System.currentTimeMillis());
+ updateCount++;
+ } else {
+ error += (thi.taskAttemptId + ", ");
+ }
}
}
}
+ if (!error.isEmpty()) {
+ LOG.info("The tasks we expected to be on the node are not there: " + error);
+ }
if (updateCount == 0) {
LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
@@ -395,8 +422,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
@Override
- public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
- updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port);
+ public void nodeHeartbeat(
+ Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+ updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
// No need to propagate to this to the responder
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index dbfe54b..a2dca1b 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -14,6 +14,8 @@
package org.apache.hadoop.hive.llap.protocol;
+import org.apache.hadoop.io.ArrayWritable;
+
import java.io.IOException;
import org.apache.hadoop.io.Text;
@@ -28,6 +30,13 @@ import org.apache.tez.runtime.common.security.JobTokenSelector;
@TokenInfo(JobTokenSelector.class)
public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
+ // Why are we still using writables in 2017?
+ public class TezAttemptArray extends ArrayWritable {
+ public TezAttemptArray() {
+ super(TezTaskAttemptID.class);
+ }
+ }
+
public static final long versionID = 1L;
// From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
@@ -35,7 +44,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException;
- public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException;
+ public void nodeHeartbeat(
+ Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException;
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 93f8073..027d8eb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,6 +14,20 @@
package org.apache.hadoop.hive.llap.daemon.impl;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+import java.util.ArrayList;
+
+import java.util.List;
+
+import java.util.HashSet;
+
+import java.util.Set;
+
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.net.SocketFactory;
import java.io.IOException;
@@ -175,7 +189,8 @@ public class AMReporter extends AbstractService {
}
public void registerTask(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) {
+ Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
+ TezTaskAttemptID attemptId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier);
}
@@ -184,9 +199,8 @@ public class AMReporter extends AbstractService {
LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
amNodeInfo = knownAppMasters.get(amNodeId);
if (amNodeInfo == null) {
- amNodeInfo =
- new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
- conf);
+ amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier,
+ retryPolicy, retryTimeout, socketFactory, conf);
knownAppMasters.put(amNodeId, amNodeInfo);
// Add to the queue only the first time this is registered, and on
// subsequent instances when it's taken off the queue.
@@ -194,11 +208,11 @@ public class AMReporter extends AbstractService {
pendingHeartbeatQueeu.add(amNodeInfo);
}
amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
- amNodeInfo.incrementAndGetTaskCount();
+ amNodeInfo.addTaskAttempt(attemptId);
}
}
- public void unregisterTask(String amLocation, int port) {
+ public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) {
if (LOG.isTraceEnabled()) {
LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port);
}
@@ -209,7 +223,7 @@ public class AMReporter extends AbstractService {
if (amNodeInfo == null) {
LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port));
} else {
- amNodeInfo.decrementAndGetTaskCount();
+ amNodeInfo.removeTaskAttempt(ta);
}
// Not removing this here. Will be removed when taken off the queue and discovered to have 0
// pending tasks.
@@ -334,29 +348,33 @@ public class AMReporter extends AbstractService {
if (LOG.isTraceEnabled()) {
LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo);
}
- if (amNodeInfo.getTaskCount() > 0) {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("NodeHeartbeat to: " + amNodeInfo);
- }
- amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
- new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort());
- } catch (IOException e) {
- QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
- amNodeInfo.setAmFailed(true);
- LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
- amNodeInfo.amNodeId, currentQueryIdentifier, e);
- queryFailedHandler.queryFailed(currentQueryIdentifier);
- } catch (InterruptedException e) {
- if (!isShutdown.get()) {
- LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
- }
- }
- } else {
+ List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot();
+ if (tasks.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0");
}
+ return null;
}
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("NodeHeartbeat to: " + amNodeInfo);
+ }
+ TezAttemptArray aw = new TezAttemptArray();
+ aw.set(tasks.toArray(new TezTaskAttemptID[tasks.size()]));
+ amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
+ new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw);
+ } catch (IOException e) {
+ QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+ amNodeInfo.setAmFailed(true);
+ LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
+ amNodeInfo.amNodeId, currentQueryIdentifier, e);
+ queryFailedHandler.queryFailed(currentQueryIdentifier);
+ } catch (InterruptedException e) {
+ if (!isShutdown.get()) {
+ LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
+ }
+ }
+
return null;
}
}
@@ -364,7 +382,8 @@ public class AMReporter extends AbstractService {
private static class AMNodeInfo implements Delayed {
- private final AtomicInteger taskCount = new AtomicInteger(0);
+ // Serves as lock for itself.
+ private final Set<TezTaskAttemptID> tasks = new HashSet<>();
private final String user;
private final Token<JobTokenIdentifier> jobToken;
private final Configuration conf;
@@ -422,12 +441,22 @@ public class AMReporter extends AbstractService {
umbilical = null;
}
- int incrementAndGetTaskCount() {
- return taskCount.incrementAndGet();
+ int addTaskAttempt(TezTaskAttemptID attemptId) {
+ synchronized (tasks) {
+ if (!tasks.add(attemptId)) {
+ throw new RuntimeException(attemptId + " was already registered");
+ }
+ return tasks.size();
+ }
}
- int decrementAndGetTaskCount() {
- return taskCount.decrementAndGet();
+ int removeTaskAttempt(TezTaskAttemptID attemptId) {
+ synchronized (tasks) {
+ if (!tasks.remove(attemptId)) {
+ throw new RuntimeException(attemptId + " was not registered and couldn't be removed");
+ }
+ return tasks.size();
+ }
}
void setAmFailed(boolean val) {
@@ -438,8 +467,12 @@ public class AMReporter extends AbstractService {
return amFailed.get();
}
- int getTaskCount() {
- return taskCount.get();
+ List<TezTaskAttemptID> getTasksSnapshot() {
+ List<TezTaskAttemptID> result = new ArrayList<>();
+ synchronized (tasks) {
+ result.addAll(tasks);
+ }
+ return result;
}
public synchronized QueryIdentifier getCurrentQueryIdentifier() {
@@ -475,5 +508,11 @@ public class AMReporter extends AbstractService {
public String toString() {
return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount();
}
+
+ private int getTaskCount() {
+ synchronized (tasks) {
+ return tasks.size();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d8689ba..bfb155a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -141,7 +141,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
if (amReporter != null && jobToken != null) {
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+ vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId);
}
this.metrics = metrics;
this.requestId = taskSpec.getTaskAttemptID().toString();
@@ -172,11 +172,12 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
}
// Unregister from the AMReporter, since the task is now running.
- this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+ TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
+ this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta);
synchronized (this) {
if (!shouldRunTask) {
- LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+ LOG.info("Not starting task {} since it was killed earlier", ta);
return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
}
}
@@ -290,19 +291,19 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
if (!isCompleted.get()) {
if (!killInvoked.getAndSet(true)) {
synchronized (this) {
- LOG.info("Kill task requested for id={}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(),
- (taskRunner != null));
+ TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
+ LOG.info("Kill task requested for id={}, taskRunnerSetup={}", ta, taskRunner != null);
if (taskRunner != null) {
killtimerWatch.start();
LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
boolean killed = taskRunner.killTask();
if (killed) {
// Sending a kill message to the AM right here. Don't need to wait for the task to complete.
- LOG.info("Kill request for task {} completed. Informing AM", taskSpec.getTaskAttemptID());
+ LOG.info("Kill request for task {} completed. Informing AM", ta);
reportTaskKilled();
} else {
LOG.info("Kill request for task {} did not complete because the task is already complete",
- taskSpec.getTaskAttemptID());
+ ta);
}
shouldRunTask = false;
} else {
@@ -314,7 +315,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// If the task hasn't started - inform about fragment completion immediately. It's possible for
// the callable to never run.
fragmentCompletionHanler.fragmentComplete(fragmentInfo);
- this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+ this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/9cde56db/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 1de4bfe..c7f4ce8 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -14,6 +14,10 @@
package org.apache.hadoop.hive.llap.tezplugins;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
@@ -565,13 +569,18 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
- void nodePinged(String hostname, String uniqueId, int port) {
+ void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks) {
// TODO: do we ever need the port? we could just do away with nodeId altogether.
LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
registerPingingNode(nodeId);
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
+ HashSet<TezTaskAttemptID> attempts = new HashSet<>();
+ for (Writable w : tasks.get()) {
+ attempts.add((TezTaskAttemptID)w);
+ }
+ String error = "";
synchronized (biMap) {
for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
// TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
@@ -584,11 +593,18 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// However, the next heartbeat(s) should get the value eventually and mark task as alive.
// Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
- getContext().taskAlive(entry.getValue());
+ if (attempts.contains(attemptId)) {
+ getContext().taskAlive(entry.getValue());
+ } else {
+ error += (attemptId + ", ");
+ }
getContext().containerAlive(entry.getKey());
}
}
}
+ if (!error.isEmpty()) {
+ LOG.info("The tasks we expected to be on the node are not there: " + error);
+ }
} else {
long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
@@ -679,8 +695,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
@Override
- public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
- nodePinged(hostname.toString(), uniqueId.toString(), port);
+ public void nodeHeartbeat(
+ Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException {
+ nodePinged(hostname.toString(), uniqueId.toString(), port, aw);
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
}
[2/2] hive git commit: HIVE-15390 : Orc reader unnecessarily reading
stripe footers with hive.optimize.index.filter set to true (Abhishek Somani,
reviewed by Sergey Shelukhin and Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-15390 : Orc reader unnecessarily reading stripe footers with hive.optimize.index.filter set to true (Abhishek Somani, reviewed by Sergey Shelukhin and Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab7f6f31
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab7f6f31
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab7f6f31
Branch: refs/heads/master
Commit: ab7f6f31fb85e49fd41ad1400f6e3aaa59fc95a8
Parents: 9cde56d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 20 14:17:13 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 20 14:17:33 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ab7f6f31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index fb7a6b2..dd53afa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -455,7 +455,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderKey key = new ReaderKey();
if (isOriginal) {
options = options.clone();
- options.range(options.getOffset(), Long.MAX_VALUE);
pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
options);
} else {