You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC
svn commit: r1784224 [7/17] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Feb 24 03:34:37 2017
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.EdgeProper
*/
public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
+ static private int maxTaskCount;
static final double DEFAULT_FLATTEN_FACTOR = 10;
static final double DEFAULT_FILTER_FACTOR = 0.7;
static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -75,8 +76,6 @@ public class TezOperDependencyParallelis
static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
private PigContext pc;
- private int maxTaskCount;
- private long bytesPerReducer;
@Override
public void setPigContext(PigContext pc) {
@@ -95,18 +94,16 @@ public class TezOperDependencyParallelis
maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
- bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-
- // If we have already estimated parallelism, use that one
- if (tezOper.getEstimatedParallelism() != -1) {
- return tezOper.getEstimatedParallelism();
- }
-
// If parallelism is set explicitly, respect it
if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
}
+ // If we have already estimated parallelism, use that one
+ if (tezOper.getEstimatedParallelism()!=-1) {
+ return tezOper.getEstimatedParallelism();
+ }
+
List<TezOperator> preds = plan.getPredecessors(tezOper);
if (preds==null) {
throw new IOException("Cannot estimate parallelism for source vertex");
@@ -133,12 +130,6 @@ public class TezOperDependencyParallelis
boolean applyFactor = !tezOper.isUnion();
if (!pred.isVertexGroup() && applyFactor) {
predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
- if (pred.getTotalInputFilesSize() > 0) {
- // Estimate similar to mapreduce and use the maximum of two
- int parallelismBySize = (int) Math.ceil((double) pred
- .getTotalInputFilesSize() / bytesPerReducer);
- predParallelism = Math.max(predParallelism, parallelismBySize);
- }
}
estimatedParallelism += predParallelism;
}
@@ -166,7 +157,9 @@ public class TezOperDependencyParallelis
}
if (roundedEstimatedParallelism == 0) {
- roundedEstimatedParallelism = 1; // We need to produce empty output file
+ throw new IOException("Estimated parallelism for "
+ + tezOper.getOperatorKey().toString()
+ + " is 0 which is unexpected");
}
return roundedEstimatedParallelism;
@@ -203,7 +196,7 @@ public class TezOperDependencyParallelis
if (successor != null) {
// Map side combiner
TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
- if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
+ if (!edge.combinePlan.isEmpty()) {
if (successor.isDistinct()) {
factor = DEFAULT_DISTINCT_FACTOR;
} else {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Feb 24 03:34:37 2017
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigConfiguration;
-import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -45,7 +44,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -54,6 +52,7 @@ import org.apache.pig.builtin.AvroStorag
import org.apache.pig.builtin.JsonStorage;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -109,12 +108,6 @@ public class UnionOptimizer extends TezO
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
return false;
}
-
- // If user has specified a PARALLEL clause with the union operator
- // turn off union optimization
- if (tezOp.getRequestedParallelism() != -1) {
- return false;
- }
// Two vertices separately ranking with 1 to n and writing to output directly
// will make each rank repeate twice which is wrong. Rank always needs to be
// done from single vertex to have the counting correct.
@@ -127,25 +120,10 @@ public class UnionOptimizer extends TezO
public static boolean isOptimizableStoreFunc(TezOperator tezOp,
List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs)
throws VisitorException {
- List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
-
- for (POStoreTez store : stores) {
- String name = store.getStoreFunc().getClass().getName();
- if (store.getStoreFunc() instanceof StoreFunc) {
- StoreFunc func = (StoreFunc) store.getStoreFunc();
- if (func.supportsParallelWriteToStoreLocation() != null) {
- if (func.supportsParallelWriteToStoreLocation()) {
- continue;
- } else {
- LOG.warn(name + " does not support union optimization."
- + " Disabling it. There will be some performance degradation.");
- return false;
- }
- }
- }
- // If StoreFunc does not explicitly state support, then check supported and
- // unsupported config settings.
- if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
+ if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
+ List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+ for (POStoreTez store : stores) {
+ String name = store.getStoreFunc().getClass().getName();
if (unsupportedStoreFuncs != null
&& unsupportedStoreFuncs.contains(name)) {
return false;
@@ -259,23 +237,8 @@ public class UnionOptimizer extends TezO
for (TezOperator succ : successors) {
if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
existingVertexGroup = succ;
- break;
- }
- }
- }
- if (existingVertexGroup == null) {
- // In the case of union + split + union + store, the different stores in the Split
- // will be writing to same location after second union operator is optimized.
- // So while optimizing the first union, we should just make it write to one vertex group
- for (int j = 0; j < i; j++) {
- if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
- storeVertexGroupOps[i] = storeVertexGroupOps[j];
- break;
}
}
- if (storeVertexGroupOps[i] != null) {
- continue;
- }
}
if (existingVertexGroup != null) {
storeVertexGroupOps[i] = existingVertexGroup;
@@ -307,15 +270,6 @@ public class UnionOptimizer extends TezO
TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
String[] newOutputKeys = new String[unionOutputKeys.size()];
for (int i=0; i < outputVertexGroupOps.length; i++) {
- for (int j = 0; j < i; j++) {
- if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
- outputVertexGroupOps[i] = outputVertexGroupOps[j];
- break;
- }
- }
- if (outputVertexGroupOps[i] != null) {
- continue;
- }
outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
@@ -561,24 +515,15 @@ public class UnionOptimizer extends TezO
// Connect predecessor to the storeVertexGroups
int i = 0;
for (TezOperator storeVertexGroup : storeVertexGroupOps) {
- // Skip connecting if they are already connected. Can happen in case of
- // union + split + union + store. Because of the split all the stores
- // will be writing to same location
- List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs();
- if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
- tezPlan.connect(pred, storeVertexGroup);
- }
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
storeVertexGroup.getOperatorKey());
+ tezPlan.connect(pred, storeVertexGroup);
}
for (TezOperator outputVertexGroup : outputVertexGroupOps) {
- List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs();
- if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
- tezPlan.connect(pred, outputVertexGroup);
- }
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+ tezPlan.connect(pred, outputVertexGroup);
}
copyOperatorProperties(pred, unionOp);
@@ -623,7 +568,7 @@ public class UnionOptimizer extends TezO
// more union predecessors. Change it to SCATTER_GATHER
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
- edge.partitionerClass = HashValuePartitioner.class;
+ edge.partitionerClass = RoundRobinPartitioner.class;
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Feb 24 03:34:37 2017
@@ -17,25 +17,23 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -48,13 +46,8 @@ import com.google.common.collect.Lists;
public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
- private volatile boolean parallelismSet;
+ private boolean isParallelismSet = false;
private int dynamicParallelism = -1;
- private int numConfiguredSources;
- private int numSources = -1;
- private volatile boolean configured;
- private volatile boolean started;
- private volatile boolean scheduled;
public PartitionerDefinedVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -62,31 +55,7 @@ public class PartitionerDefinedVertexMan
@Override
public void initialize() {
- // this will prevent vertex from starting until we notify we are done
- getContext().vertexReconfigurationPlanned();
- parallelismSet = false;
- numConfiguredSources = 0;
- configured = false;
- started = false;
- numSources = getContext().getInputVertexEdgeProperties().size();
- // wait for sources and self to start
- Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
- for (String entry : edges.keySet()) {
- getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
- }
- }
-
- @Override
- public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
- throws Exception {
- numConfiguredSources++;
- LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
- + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
- + " needed: " + numSources);
- Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName());
- if (numConfiguredSources == numSources) {
- configure();
- }
+ // Nothing to do
}
@Override
@@ -104,9 +73,10 @@ public class PartitionerDefinedVertexMan
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
// There could be multiple partition vertex sending VertexManagerEvent
// Only need to setVertexParallelism once
- if (parallelismSet) {
+ if (isParallelismSet) {
return;
}
+ isParallelismSet = true;
// Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput
if (vmEvent.getUserPayload().limit()==4) {
dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -126,50 +96,18 @@ public class PartitionerDefinedVertexMan
edgeManagers.put(entry.getKey(), edge);
}
getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers);
- parallelismSet = true;
- configure();
- }
- }
- }
-
- private void configure() {
- if(parallelismSet && (numSources == numConfiguredSources)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done reconfiguring vertex " + getContext().getVertexName());
}
- getContext().doneReconfiguringVertex();
- configured = true;
- trySchedulingTasks();
}
}
- private synchronized void trySchedulingTasks() {
- if (configured && started && !scheduled) {
- LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName());
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ if (dynamicParallelism != -1) {
List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
- for (int i = 0; i < dynamicParallelism; ++i) {
+ for (int i=0; i<dynamicParallelism; ++i) {
tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
}
getContext().scheduleVertexTasks(tasksToStart);
- scheduled = true;
}
}
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
- // This vertex manager will be getting the following calls
- // 1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex
- // 2) onVertexStateUpdated - Vertex CONFIGURED status updates from
- // - Order by Partitioner vertex (1-1) in case of Order by
- // - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin
- // 3) onVertexStarted
- // Calls 2) and 3) can happen in any order. So we should schedule tasks
- // only after start is called and configuration is also complete
- started = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex start received for " + getContext().getVertexName());
- }
- trySchedulingTasks();
- }
-
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Fri Feb 24 03:34:37 2017
@@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage
conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
- pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT));
+ pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan);
try {
@@ -81,10 +81,9 @@ public class PigGraceShuffleVertexManage
throw new TezUncheckedException(e);
}
TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
-
+
// Collect grandparents of the vertex
- Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() {
- @Override
+ Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() {
public String apply(TezOperator op) { return op.getOperatorKey().toString(); }
};
grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString);
@@ -136,7 +135,7 @@ public class PigGraceShuffleVertexManage
// Now one of the predecessor is about to start, we need to make a decision now
if (anyPredAboutToStart) {
// All grandparents finished, start parents with right parallelism
-
+
for (TezOperator pred : preds) {
if (pred.getRequestedParallelism()==-1) {
List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Feb 24 03:34:37 2017
@@ -25,7 +25,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
@@ -41,7 +39,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -56,7 +53,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -136,11 +132,7 @@ public class PigProcessor extends Abstra
SpillableMemoryManager.getInstance().configure(conf);
PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
.deserialize(conf.get("udf.import.list")));
- Properties log4jProperties = (Properties) ObjectSerializer
- .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
- if (log4jProperties != null) {
- PropertyConfigurator.configure(log4jProperties);
- }
+ PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
// To determine front-end in UDFContext
conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
@@ -159,12 +151,6 @@ public class PigProcessor extends Abstra
conf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
- if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) {
- // Has Load and is a root vertex
- conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism());
- } else {
- conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism());
- }
conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
UDFContext.getUDFContext().addJobConf(conf);
@@ -172,7 +158,7 @@ public class PigProcessor extends Abstra
String execPlanString = conf.get(PLAN);
execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString);
- SchemaTupleBackend.initialize(conf);
+ SchemaTupleBackend.initialize(conf, pc);
PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID());
// Set the job conf as a thread-local member of PigMapReduce
@@ -181,7 +167,7 @@ public class PigProcessor extends Abstra
Utils.setDefaultTimeZone(conf);
- boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new TezTaskContext(getContext()));
pigHadoopLogger = PigHadoopLogger.getInstance();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Fri Feb 24 03:34:37 2017
@@ -43,15 +43,6 @@ public interface TezInput {
*/
public void addInputsToSkip(Set<String> inputsToSkip);
- /**
- * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch
- * the input so that all inputs are fetched and memory released before memory is allocated
- * for outputs
- *
- * @param inputs available inputs
- * @param conf configuration
- * @throws ExecException
- */
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf) throws ExecException;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Fri Feb 24 03:34:37 2017
@@ -23,7 +23,6 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.data.DataBag;
@@ -31,7 +30,6 @@ import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -66,13 +64,11 @@ public class WeightedRangePartitionerTez
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
convertToArray(quantilesList);
- long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
- long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple) ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple) ent.getValue());
weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
+ new DiscreteProbabilitySampleGenerator(probVec));
}
} catch (Exception e) {
throw new RuntimeException(e);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Feb 24 03:34:37 2017
@@ -50,7 +50,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -103,6 +102,7 @@ public class MRToTezHelper {
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
+ mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
}
@@ -165,7 +165,11 @@ public class MRToTezHelper {
continue;
}
}
- if (key.startsWith("yarn.nodemanager")) {
+ if (key.startsWith("dfs.datanode")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("dfs.namenode")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("yarn.nodemanager")) {
tezConf.unset(key);
} else if (key.startsWith("mapreduce.jobhistory")) {
tezConf.unset(key);
@@ -177,15 +181,20 @@ public class MRToTezHelper {
}
}
- public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
+ public static TezConfiguration getDAGAMConfFromMRConf(
+ Configuration tezConf) {
+
+ // Set Tez parameters based on MR parameters.
+ TezConfiguration dagAMConf = new TezConfiguration(tezConf);
+
convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
- String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
- if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
- env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
- : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
+ String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
+ env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
+ : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV);
}
if (env != null) {
@@ -194,23 +203,24 @@ public class MRToTezHelper {
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
org.apache.tez.mapreduce.hadoop.MRHelpers
- .getJavaOptsForMRAM(dagAMConf));
+ .getJavaOptsForMRAM(tezConf));
- String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
+ String queueName = tezConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME);
dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
- dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+ tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
- dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+ tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
// Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
removeUnwantedSettings(dagAMConf, true);
+ return dagAMConf;
}
/**
@@ -253,14 +263,6 @@ public class MRToTezHelper {
JobControlCompiler.configureCompression(tezConf);
convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
removeUnwantedSettings(tezConf, false);
-
- // ShuffleVertexManager Plugin settings
- // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
- String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
- if (slowStartFraction != null) {
- tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
- tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
- }
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Feb 24 03:34:37 2017
@@ -36,14 +36,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
+import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -199,8 +198,8 @@ public class TezCompilerUtil {
public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
try {
- List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
- for (POFRJoinTez input : inputs) {
+ List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+ for (TezInput input : inputs) {
if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
return true;
}
@@ -270,7 +269,7 @@ public class TezCompilerUtil {
} else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
- edge.partitionerClass = HashValuePartitioner.class;
+ edge.partitionerClass = RoundRobinPartitioner.class;
}
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 24 03:34:37 2017
@@ -70,7 +70,7 @@ public class MapRedUtil {
private static Log log = LogFactory.getLog(MapRedUtil.class);
private static final TupleFactory tf = TupleFactory.getInstance();
- public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY;
+ public static final String FILE_SYSTEM_NAME = "fs.default.name";
/**
* Loads the key distribution sampler file
@@ -301,7 +301,7 @@ public class MapRedUtil {
/**
* Returns the total number of bytes for this file, or if a directory all
* files in the directory.
- *
+ *
* @param fs FileSystem
* @param status FileStatus
* @param max Maximum value of total length that will trigger exit. Many
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Feb 24 03:34:37 2017
@@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.hb
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -86,6 +86,7 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
@@ -596,9 +597,7 @@ public class HBaseStorage extends LoadFu
new BinaryComparator(colInfo.getColumnName())));
}
}
- if (columnFilters.getFilters().size() != 0) {
- thisColumnGroupFilter.addFilter(columnFilters);
- }
+ thisColumnGroupFilter.addFilter(columnFilters);
allColumnFilters.addFilter(thisColumnGroupFilter);
}
if (allColumnFilters != null) {
@@ -793,35 +792,46 @@ public class HBaseStorage extends LoadFu
public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
- Configuration conf = new Configuration();
- TableMapReduceUtil.addHBaseDependencyJars(conf);
- if (conf.get("tmpjars") != null) {
- String[] tmpjars = conf.getStrings("tmpjars");
- List<String> shipFiles = new ArrayList<String>(tmpjars.length);
- for (String tmpjar : tmpjars) {
- shipFiles.add(new URL(tmpjar).getPath());
+ Method addHBaseDependencyJars =
+ TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
+ if (addHBaseDependencyJars != null) {
+ Configuration conf = new Configuration();
+ addHBaseDependencyJars.invoke(null, conf);
+ if (conf.get("tmpjars") != null) {
+ String[] tmpjars = conf.getStrings("tmpjars");
+ List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+ for (String tmpjar : tmpjars) {
+ shipFiles.add(new URL(tmpjar).getPath());
+ }
+ return shipFiles;
}
- return shipFiles;
- }
- } catch (IOException e) {
- if(e instanceof MalformedURLException){
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
- + " had malformed url. Falling back to previous logic.", e);
- }else {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
- + " failed. Falling back to previous logic.", e);
}
+ } catch (NoSuchMethodException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
+ + " Falling back to previous logic.", e);
+ } catch (IllegalAccessException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " not permitted. Falling back to previous logic.", e);
+ } catch (InvocationTargetException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " failed. Falling back to previous logic.", e);
+ } catch (MalformedURLException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ + " had malformed url. Falling back to previous logic.", e);
}
List<Class> classList = new ArrayList<Class>();
classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+ if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
+ classList.add(com.google.common.collect.Lists.class); // guava
+ }
classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
addClassToList("org.cloudera.htrace.Trace", classList); // htrace
addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
- addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
+ addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
return FuncUtils.getShipFiles(classList);
}
@@ -872,13 +882,27 @@ public class HBaseStorage extends LoadFu
}
if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+ // Will not be entering this block for 0.20.2 as it has no security.
try {
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- if (currentUser.hasKerberosCredentials()) {
- TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
+ // getCurrentUser method is not public in 0.20.2
+ Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
+ UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
+ // hasKerberosCredentials method not available in 0.20.2
+ Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
+ boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
+ if (hasKerberosCredentials) {
+ // Class and method are available only from 0.92 security release
+ Class tokenUtilClass = Class
+ .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
+ Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
+ Configuration.class, UserGroupInformation.class, Job.class });
+ m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
} else {
LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
}
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Failure loading TokenUtil class, "
+ + "is secure RPC available?", cnfe);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Fri Feb 24 03:34:37 2017
@@ -35,7 +35,6 @@ import org.apache.pig.FilterFunc;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
/**
* Use a Bloom filter build previously by BuildBloom. You would first
@@ -55,36 +54,14 @@ import org.apache.pig.data.TupleFactory;
* C = filter B by bloom(z);
* D = join C by z, A by x;
* It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
- *
- * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF
- * as a scalar instead of storing it to file and loading again. This is simpler
- * if the Bloom filter will not be reused and needs to be discarded after the
- * run of the script.
- *
- * define bb BuildBloom('jenkins', '100', '0.1');
- * A = load 'foo' as (x, y);
- * B = group A all;
- * C = foreach B generate bb(A.x) as bloomfilter;
- * D = load 'bar' as (z);
- * E = filter D by Bloom(C.bloomfilter, z);
- * F = join E by z, A by x;
*/
public class Bloom extends FilterFunc {
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
private String bloomFile;
- private BloomFilter filter = null;
+ public BloomFilter filter = null;
- public Bloom() {
- }
-
- /**
- * The filename containing the serialized Bloom filter. If filename is null
- * or the no-arg constructor is used, then the bloomfilter bytearray which
- * is the output of BuildBloom should be passed as the first argument to the UDF
- *
- * @param filename file containing the serialized Bloom filter
+ /**
+ * @param filename file containing the serialized Bloom filter
*/
public Bloom(String filename) {
bloomFile = filename;
@@ -93,25 +70,11 @@ public class Bloom extends FilterFunc {
@Override
public Boolean exec(Tuple input) throws IOException {
if (filter == null) {
- init(input);
+ init();
}
byte[] b;
- if (bloomFile == null) {
- // The first one is the bloom filter. Skip that
- if (input.size() == 2) {
- b = DataType.toBytes(input.get(1));
- } else {
- List<Object> inputList = input.getAll();
- Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size()));
- b = DataType.toBytes(tuple, DataType.TUPLE);
- }
- } else {
- if (input.size() == 1) {
- b = DataType.toBytes(input.get(0));
- } else {
- b = DataType.toBytes(input, DataType.TUPLE);
- }
- }
+ if (input.size() == 1) b = DataType.toBytes(input.get(0));
+ else b = DataType.toBytes(input, DataType.TUPLE);
Key k = new Key(b);
return filter.membershipTest(k);
@@ -119,46 +82,34 @@ public class Bloom extends FilterFunc {
@Override
public List<String> getCacheFiles() {
- if (bloomFile != null) {
- List<String> list = new ArrayList<String>(1);
- // We were passed the name of the file on HDFS. Append a
- // name for the file on the task node.
- try {
- list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return list;
+ List<String> list = new ArrayList<String>(1);
+ // We were passed the name of the file on HDFS. Append a
+ // name for the file on the task node.
+ try {
+ list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return null;
+ return list;
}
- private void init(Tuple input) throws IOException {
- if (bloomFile == null) {
- if (input.get(0) instanceof DataByteArray) {
- filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0));
- } else {
- throw new IllegalArgumentException("The first argument to the Bloom UDF should be"
- + " the bloom filter if a bloom file is not specified in the constructor");
- }
- } else {
- filter = new BloomFilter();
- String dir = "./" + getFilenameFromPath(bloomFile);
- String[] partFiles = new File(dir)
- .list(new FilenameFilter() {
- @Override
- public boolean accept(File current, String name) {
- return name.startsWith("part");
- }
- });
-
- String dcFile = dir + "/" + partFiles[0];
- DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
- try {
- filter.readFields(dis);
- } finally {
- dis.close();
- }
+ private void init() throws IOException {
+ filter = new BloomFilter();
+ String dir = "./" + getFilenameFromPath(bloomFile);
+ String[] partFiles = new File(dir)
+ .list(new FilenameFilter() {
+ @Override
+ public boolean accept(File current, String name) {
+ return name.startsWith("part");
+ }
+ });
+
+ String dcFile = dir + "/" + partFiles[0];
+ DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+ try {
+ filter.readFields(dis);
+ } finally {
+ dis.close();
}
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Fri Feb 24 03:34:37 2017
@@ -18,15 +18,16 @@
package org.apache.pig.builtin;
+import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.hash.Hash;
+
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
@@ -46,7 +47,7 @@ public abstract class BuildBloomBase<T>
protected BuildBloomBase() {
}
- /**
+ /**
* @param hashType type of the hashing function (see
* {@link org.apache.hadoop.util.hash.Hash}).
* @param mode Will be ignored, though by convention it should be
@@ -63,7 +64,7 @@ public abstract class BuildBloomBase<T>
hType = convertHashType(hashType);
}
- /**
+ /**
* @param hashType type of the hashing function (see
* {@link org.apache.hadoop.util.hash.Hash}).
* @param numElements The number of distinct elements expected to be
@@ -103,7 +104,7 @@ public abstract class BuildBloomBase<T>
return new DataByteArray(baos.toByteArray());
}
- public static BloomFilter bloomIn(DataByteArray b) throws IOException {
+ protected BloomFilter bloomIn(DataByteArray b) throws IOException {
DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(b.get()));
BloomFilter f = new BloomFilter();
Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Fri Feb 24 03:34:37 2017
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
@@ -181,9 +180,20 @@ abstract class HiveUDFBase extends EvalF
@Override
public List<String> getShipFiles() {
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ Class hadoopVersionShimsClass;
+ try {
+ hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+ hadoopVersion + "Shims");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+ }
List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
- PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
- Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
+ PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+ hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
return files;
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Feb 24 03:34:37 2017
@@ -56,7 +56,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -390,8 +389,20 @@ public class OrcStorage extends LoadFunc
@Override
public List<String> getShipFiles() {
+ List<String> cacheFiles = new ArrayList<String>();
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ Class hadoopVersionShimsClass;
+ try {
+ hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+ hadoopVersion + "Shims");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+ }
Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
- org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
Input.class};
return FuncUtils.getShipFiles(classList);
}
@@ -445,7 +456,7 @@ public class OrcStorage extends LoadFunc
}
private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
- FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration());
+ FileSystem fs = FileSystem.get(job.getConfiguration());
Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
if (path == null) {
log.info("Cannot find any ORC files from " + location +
Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Feb 24 03:34:37 2017
@@ -68,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -170,7 +171,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
- Option overwrite = new Option("overwrite", "Overwrites the destination.");
+ Option overwrite = new Option(" ", "Overwrites the destination.");
overwrite.setLongOpt("overwrite");
overwrite.setOptionalArg(true);
overwrite.setArgs(1);
@@ -411,7 +412,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && (!bzipinput_usehadoops) ) {
+ && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Fri Feb 24 03:34:37 2017
@@ -17,63 +17,15 @@
*/
package org.apache.pig.builtin;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
-/**
- * This partitioner should be used with extreme caution and only in cases
- * where the order of output records is guaranteed to be same. If the order of
- * output records can vary on retries which is mostly the case, map reruns
- * due to shuffle fetch failures can lead to data being partitioned differently
- * and result in incorrect output due to loss or duplication of data.
- * Refer PIG-5041 for more details.
- *
- * This will be removed in the next release as it is risky to use in most cases.
- */
-@Deprecated
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
- implements Configurable {
-
- /**
- * Batch size for round robin partitioning. Batch size number of records
- * will be distributed to each partition in a round robin fashion. Default
- * value is 0 which distributes each record in a circular fashion. Higher
- * number for batch size can be used to increase probability of keeping
- * similar records in the same partition if output is already sorted and get
- * better compression.
- */
- public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
- private int num = -1;
- private int batchSize = 0;
- private int currentBatchCount = 0;
- private Configuration conf;
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
+ private int num = 0;
@Override
public int getPartition(Writable key, Writable value, int numPartitions) {
- if (batchSize > 0) {
- if (currentBatchCount == 0) {
- num = ++num % numPartitions;
- }
- if (++currentBatchCount == batchSize) {
- currentBatchCount = 0;
- }
- } else {
- num = ++num % numPartitions;
- }
+ num = ++num % numPartitions;
return num;
}
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Feb 24 03:34:37 2017
@@ -37,6 +37,7 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -258,7 +259,8 @@ public class TextLoader extends LoadFunc
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && !bzipinput_usehadoops ) {
+ && !HadoopShims.isHadoopYARN()
+ && !bzipinput_usehadoops ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Fri Feb 24 03:34:37 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
}
@SuppressWarnings("rawtypes")
- protected void warn(String msg, Enum warningEnum, Throwable e) {
+ protected void warn(String msg, Enum warningEnum, Exception e) {
pigLogger = PhysicalOperator.getPigLogger();
if(pigLogger != null) {
pigLogger.warn(this, msg, warningEnum);
Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Fri Feb 24 03:34:37 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.io.FileNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
public class DefaultDataBag extends DefaultAbstractBag {
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-
+
private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
public boolean isSorted() {
return false;
}
-
+
@Override
public boolean isDistinct() {
return false;
}
-
+
@Override
public Iterator<Tuple> iterator() {
return new DefaultDataBagIterator();
@@ -110,15 +110,12 @@ public class DefaultDataBag extends Defa
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
- out.close();
- out = null;
- mContents.clear();
- } catch (Throwable e) {
+ } catch (IOException ioe) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
return 0;
} finally {
if (out != null) {
@@ -129,6 +126,7 @@ public class DefaultDataBag extends Defa
}
}
}
+ mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -158,7 +156,7 @@ public class DefaultDataBag extends Defa
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
if (hasCachedTuple)
return (mBuf != null);
@@ -211,7 +209,7 @@ public class DefaultDataBag extends Defa
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -225,7 +223,7 @@ public class DefaultDataBag extends Defa
log.fatal(msg, eof);
throw new RuntimeException(msg, eof);
} catch (IOException ioe) {
- String msg = "Unable to read our spill file.";
+ String msg = "Unable to read our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
}
@@ -261,7 +259,7 @@ public class DefaultDataBag extends Defa
log.warn("Failed to close spill file.", e);
}
} catch (IOException ioe) {
- String msg = "Unable to read our spill file.";
+ String msg = "Unable to read our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
}
Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Fri Feb 24 03:34:37 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
public boolean isSorted() {
return false;
}
-
+
@Override
public boolean isDistinct() {
return true;
}
-
-
+
+
@Override
public long size() {
if (mSpillFiles != null && mSpillFiles.size() > 0){
- //We need to racalculate size to guarantee a count of unique
+ //We need to racalculate size to guarantee a count of unique
//entries including those on disk
Iterator<Tuple> iter = iterator();
int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
newSize++;
iter.next();
}
-
+
synchronized(mContents) {
//we don't want adds to change our numbers
//the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
}
return mSize;
}
-
-
+
+
@Override
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
@@ -155,15 +155,12 @@ public class DistinctDataBag extends Def
}
}
out.flush();
- out.close();
- out = null;
- mContents.clear();
- } catch (Throwable e) {
+ } catch (IOException ioe) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
return 0;
} finally {
if (out != null) {
@@ -174,6 +171,7 @@ public class DistinctDataBag extends Def
}
}
}
+ mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -210,7 +208,7 @@ public class DistinctDataBag extends Def
@Override
public int hashCode() {
- return tuple.hashCode();
+ return tuple.hashCode();
}
}
@@ -239,7 +237,7 @@ public class DistinctDataBag extends Def
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
@@ -297,7 +295,7 @@ public class DistinctDataBag extends Def
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -348,7 +346,7 @@ public class DistinctDataBag extends Def
Iterator<File> i = mSpillFiles.iterator();
while (i.hasNext()) {
try {
- DataInputStream in =
+ DataInputStream in =
new DataInputStream(new BufferedInputStream(
new FileInputStream(i.next())));
mStreams.add(in);
@@ -504,7 +502,7 @@ public class DistinctDataBag extends Def
addToQueue(null, mStreams.size() - 1);
i.remove();
filesToDelete.add(f);
-
+
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
@@ -547,7 +545,7 @@ public class DistinctDataBag extends Def
log.warn("Failed to delete spill file: " + f.getPath());
}
}
-
+
// clear the list, so that finalize does not delete any files,
// when mSpillFiles is assigned a new value
mSpillFiles.clear();
@@ -562,6 +560,6 @@ public class DistinctDataBag extends Def
}
}
}
-
+
}
Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Fri Feb 24 03:34:37 2017
@@ -50,9 +50,6 @@ public class ReadOnceBag implements Data
*/
private static final long serialVersionUID = 2L;
- public ReadOnceBag() {
- }
-
/**
* This constructor creates a bag out of an existing iterator
* of tuples by taking ownership of the iterator and NOT
Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Fri Feb 24 03:34:37 2017
@@ -39,7 +39,6 @@ import org.apache.pig.data.utils.Structu
import org.apache.pig.data.utils.StructuresHelper.Triple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.Utils;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -273,20 +272,14 @@ public class SchemaTupleBackend {
private static SchemaTupleBackend stb;
public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
- if (stb != null) {
- SchemaTupleFrontend.lazyReset(pigContext);
- }
- initialize(jConf, pigContext.getExecType().isLocal());
+ initialize(jConf, pigContext, pigContext.getExecType().isLocal());
}
- public static void initialize(Configuration jConf) throws IOException {
- initialize(jConf, Utils.isLocal(jConf));
- }
-
- public static void initialize(Configuration jConf, boolean isLocal) throws IOException {
+ public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
if (stb != null) {
LOG.warn("SchemaTupleBackend has already been initialized");
} else {
+ SchemaTupleFrontend.lazyReset(pigContext);
SchemaTupleFrontend.reset();
SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
stbInstance.copyAndResolve();
Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Fri Feb 24 03:34:37 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.PriorityQueue;
-
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
* stored unsorted as it comes in, and only sorted when it is time to dump
* it to a file or when the first iterator is requested. Experementation
* found this to be the faster than storing it sorted to begin with.
- *
+ *
* We allow a user defined comparator, but provide a default comparator in
* cases where the user doesn't specify one.
*/
public class SortedDataBag extends DefaultAbstractBag{
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
@Override
public int hashCode() {
- return 42;
+ return 42;
}
}
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
public boolean isSorted() {
return true;
}
-
+
@Override
public boolean isDistinct() {
return false;
}
-
+
@Override
public Iterator<Tuple> iterator() {
return new SortedDataBagIterator();
@@ -145,15 +145,12 @@ public class SortedDataBag extends Defau
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
- out.close();
- out = null;
- mContents.clear();
- } catch (Throwable e) {
+ } catch (IOException ioe) {
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
warn(
- "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+ "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
return 0;
} finally {
if (out != null) {
@@ -164,6 +161,7 @@ public class SortedDataBag extends Defau
}
}
}
+ mContents.clear();
}
// Increment the spill count
incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -205,7 +203,7 @@ public class SortedDataBag extends Defau
@Override
public int hashCode() {
- return tuple.hashCode();
+ return tuple.hashCode();
}
}
@@ -230,7 +228,7 @@ public class SortedDataBag extends Defau
}
@Override
- public boolean hasNext() {
+ public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
@@ -343,7 +341,7 @@ public class SortedDataBag extends Defau
Iterator<File> i = mSpillFiles.iterator();
while (i.hasNext()) {
try {
- DataInputStream in =
+ DataInputStream in =
new DataInputStream(new BufferedInputStream(
new FileInputStream(i.next())));
mStreams.add(in);
@@ -353,7 +351,7 @@ public class SortedDataBag extends Defau
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// never happen.
- String msg = "Unable to find our spill file.";
+ String msg = "Unable to find our spill file.";
log.fatal(msg, fnfe);
throw new RuntimeException(msg, fnfe);
}
@@ -413,7 +411,7 @@ public class SortedDataBag extends Defau
in.close();
}catch(IOException e) {
log.warn("Failed to close spill file.", e);
- }
+ }
mStreams.set(fileNum, null);
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
@@ -520,7 +518,7 @@ public class SortedDataBag extends Defau
log.warn("Failed to delete spill file: " + f.getPath());
}
}
-
+
// clear the list, so that finalize does not delete any files,
// when mSpillFiles is assigned a new value
mSpillFiles.clear();