You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/12 02:20:22 UTC
svn commit: r1791060 [2/4] - in /pig/branches/spark: ./ bin/
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java Wed Apr 12 02:20:20 2017
@@ -1,137 +1,137 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.rdd.RDD;
-
-public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> {
-
- private static final Log LOG = LogFactory.getLog(CounterConverter.class);
-
- @Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POCounter poCounter) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
- RDD<Tuple> rdd = predecessors.get(0);
- CounterConverterFunction f = new CounterConverterFunction(poCounter);
- JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
-// jRdd = jRdd.cache();
- return jRdd.rdd();
- }
-
- @SuppressWarnings("serial")
- private static class CounterConverterFunction implements
- Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
-
- private final POCounter poCounter;
- private long localCount = 1L;
- private long sparkCount = 0L;
-
- private CounterConverterFunction(POCounter poCounter) {
- this.poCounter = poCounter;
- }
-
- @Override
- public Iterator<Tuple> call(Integer index, final
- Iterator<Tuple> input) {
- Tuple inp = null;
- Tuple output = null;
- long sizeBag = 0L;
-
- List<Tuple> listOutput = new ArrayList<Tuple>();
-
- try {
- while (input.hasNext()) {
- inp = input.next();
- output = TupleFactory.getInstance()
- .newTuple(inp.getAll().size() + 3);
-
- for (int i = 0; i < inp.getAll().size(); i++) {
- output.set(i + 3, inp.get(i));
- }
-
- if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
- output.set(2, getLocalCounter());
- incrementSparkCounter();
- incrementLocalCounter();
- } else if (!poCounter.isDenseRank()) {
- int positionBag = inp.getAll().size()-1;
- if (inp.getType(positionBag) == DataType.BAG) {
- sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
- inp.get(positionBag)).size();
- }
-
- output.set(2, getLocalCounter());
-
- addToSparkCounter(sizeBag);
- addToLocalCounter(sizeBag);
- }
-
- output.set(0, index);
- output.set(1, getSparkCounter());
- listOutput.add(output);
- }
- } catch(ExecException e) {
- throw new RuntimeException(e);
- }
-
-
- return listOutput.iterator();
- }
-
- private long getLocalCounter() {
- return localCount;
- }
-
- private long incrementLocalCounter() {
- return localCount++;
- }
-
- private long addToLocalCounter(long amount) {
- return localCount += amount;
- }
-
- private long getSparkCounter() {
- return sparkCount;
- }
-
- private long incrementSparkCounter() {
- return sparkCount++;
- }
-
- private long addToSparkCounter(long amount) {
- return sparkCount += amount;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.rdd.RDD;
+
+public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> {
+
+ private static final Log LOG = LogFactory.getLog(CounterConverter.class);
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POCounter poCounter) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ CounterConverterFunction f = new CounterConverterFunction(poCounter);
+ JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
+// jRdd = jRdd.cache();
+ return jRdd.rdd();
+ }
+
+ @SuppressWarnings("serial")
+ private static class CounterConverterFunction implements
+ Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
+
+ private final POCounter poCounter;
+ private long localCount = 1L;
+ private long sparkCount = 0L;
+
+ private CounterConverterFunction(POCounter poCounter) {
+ this.poCounter = poCounter;
+ }
+
+ @Override
+ public Iterator<Tuple> call(Integer index, final
+ Iterator<Tuple> input) {
+ Tuple inp = null;
+ Tuple output = null;
+ long sizeBag = 0L;
+
+ List<Tuple> listOutput = new ArrayList<Tuple>();
+
+ try {
+ while (input.hasNext()) {
+ inp = input.next();
+ output = TupleFactory.getInstance()
+ .newTuple(inp.getAll().size() + 3);
+
+ for (int i = 0; i < inp.getAll().size(); i++) {
+ output.set(i + 3, inp.get(i));
+ }
+
+ if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
+ output.set(2, getLocalCounter());
+ incrementSparkCounter();
+ incrementLocalCounter();
+ } else if (!poCounter.isDenseRank()) {
+ int positionBag = inp.getAll().size()-1;
+ if (inp.getType(positionBag) == DataType.BAG) {
+ sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
+ inp.get(positionBag)).size();
+ }
+
+ output.set(2, getLocalCounter());
+
+ addToSparkCounter(sizeBag);
+ addToLocalCounter(sizeBag);
+ }
+
+ output.set(0, index);
+ output.set(1, getSparkCounter());
+ listOutput.add(output);
+ }
+ } catch(ExecException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ return listOutput.iterator();
+ }
+
+ private long getLocalCounter() {
+ return localCount;
+ }
+
+ private long incrementLocalCounter() {
+ return localCount++;
+ }
+
+ private long addToLocalCounter(long amount) {
+ return localCount += amount;
+ }
+
+ private long getSparkCounter() {
+ return sparkCount;
+ }
+
+ private long incrementSparkCounter() {
+ return sparkCount++;
+ }
+
+ private long addToSparkCounter(long amount) {
+ return sparkCount += amount;
+ }
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Wed Apr 12 02:20:20 2017
@@ -21,18 +21,19 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.AbstractFunction2;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
@SuppressWarnings({ "serial" })
public class DistinctConverter implements RDDConverter<Tuple, Tuple, PODistinct> {
private static final Log LOG = LogFactory.getLog(DistinctConverter.class);
@@ -51,7 +52,7 @@ public class DistinctConverter implement
= new PairRDDFunctions<Tuple, Object>(keyValRDD,
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class), null);
- int parallelism = SparkUtil.getParallelism(predecessors, op);
+ int parallelism = SparkPigContext.get().getParallelism(predecessors, op);
return pairRDDFunctions.reduceByKey(
SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
new MergeValuesFunction())
@@ -66,15 +67,9 @@ public class DistinctConverter implement
Serializable {
@Override
public Tuple2<Tuple, Object> apply(Tuple t) {
- if (LOG.isDebugEnabled())
- LOG.debug("DistinctConverter.ToKeyValueFunction in " + t);
-
Tuple key = t;
Object value = null;
Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
-
- if (LOG.isDebugEnabled())
- LOG.debug("DistinctConverter.ToKeyValueFunction out " + out);
return out;
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Wed Apr 12 02:20:20 2017
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -59,7 +60,7 @@ public class FRJoinConverter implements
Map<String, List<Tuple>> replicatedInputMap = new HashMap<>();
for (String replicatedInput : replicatedInputs) {
- replicatedInputMap.put(replicatedInput, SparkUtil.getBroadcastedVars().get(replicatedInput).value());
+ replicatedInputMap.put(replicatedInput, SparkPigContext.get().getBroadcastedVars().get(replicatedInput).value());
}
poFRJoin.attachInputs(replicatedInputMap);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Wed Apr 12 02:20:20 2017
@@ -23,15 +23,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.runtime.AbstractFunction1;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.data.Tuple;
@@ -44,6 +39,12 @@ import org.apache.spark.api.java.functio
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
@SuppressWarnings({ "serial" })
public class GlobalRearrangeConverter implements
RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> {
@@ -57,7 +58,7 @@ public class GlobalRearrangeConverter im
POGlobalRearrangeSpark op) throws IOException {
SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
op, 0);
- int parallelism = SparkUtil.getParallelism(predecessors,
+ int parallelism = SparkPigContext.get().getParallelism(predecessors,
op);
// TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
@@ -322,7 +323,6 @@ public class GlobalRearrangeConverter im
try {
Tuple tuple = tf.newTuple(3);
tuple.set(0, index);
- tuple.set(1, key);
tuple.set(2, next);
return tuple;
} catch (ExecException e) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Apr 12 02:20:20 2017
@@ -23,18 +23,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.backend.executionengine.ExecException;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -44,6 +40,11 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
import scala.runtime.AbstractFunction1;
public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
@@ -54,10 +55,10 @@ public class JoinGroupSparkConverter imp
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
op, 0);
- List<POLocalRearrange> lraOps = op.getLraOps();
- POGlobalRearrangeSpark glaOp = op.getGlaOp();
+ List<POLocalRearrange> lraOps = op.getLROps();
+ POGlobalRearrangeSpark glaOp = op.getGROp();
POPackage pkgOp = op.getPkgOp();
- int parallelism = SparkUtil.getParallelism(predecessors, glaOp);
+ int parallelism = SparkPigContext.get().getParallelism(predecessors, glaOp);
List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
boolean useSecondaryKey = glaOp.isUseSecondaryKey();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Apr 12 02:20:20 2017
@@ -119,7 +119,7 @@ public class LoadConverter implements RD
//create SparkCounter and set it for ToTupleFunction
boolean disableCounter = jobConf.getBoolean("pig.disable.counter", false);
if (!op.isTmpLoad() && !disableCounter) {
- String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
+ String counterName = SparkStatsUtil.getCounterName(op);
SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
if (counterReporter.getCounters() != null) {
counterReporter.getCounters().createCounter(
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Wed Apr 12 02:20:20 2017
@@ -94,8 +94,7 @@ public class PackageConverter implements
.byteValue());
if (LOG.isDebugEnabled())
LOG.debug("Setting index to " + next.get(0) +
- " for tuple " + (Tuple)next.get(2) + " with key " +
- next.get(1));
+ " for tuple " + (Tuple)next.get(1));
return nullableTuple;
} catch (ExecException e) {
throw new RuntimeException(e);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java Wed Apr 12 02:20:20 2017
@@ -1,134 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.rdd.RDD;
-
-public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> {
-
- private static final Log LOG = LogFactory.getLog(RankConverter.class);
-
- @Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
- throws IOException {
- int parallelism = SparkUtil.getParallelism(predecessors, poRank);
- SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
- RDD<Tuple> rdd = predecessors.get(0);
- JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
- .mapToPair(new ToPairRdd());
- JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
- .groupByKey(parallelism);
- JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
- .mapToPair(new IndexCounters());
- JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
- .sortByKey(true, parallelism);
- Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
- JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
- .map(new RankFunction(new HashMap<Integer, Long>(counts)));
- return finalRdd.rdd();
- }
-
- @SuppressWarnings("serial")
- private static class ToPairRdd implements
- PairFunction<Tuple, Integer, Long>, Serializable {
-
- @Override
- public Tuple2<Integer, Long> call(Tuple t) {
- try {
- Integer key = (Integer) t.get(0);
- Long value = (Long) t.get(1);
- return new Tuple2<Integer, Long>(key, value);
- } catch (ExecException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static class IndexCounters implements
- PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>,
- Serializable {
- @Override
- public Tuple2<Integer, Long> call(Tuple2<Integer,
- Iterable<Long>> input) {
- long lastVaue = 0L;
-
- for (Long t : input._2()) {
- lastVaue = (t > lastVaue) ? t : lastVaue;
- }
-
- return new Tuple2<Integer, Long>(input._1(), lastVaue);
- }
- }
-
- @SuppressWarnings("serial")
- private static class RankFunction implements Function<Tuple, Tuple>,
- Serializable {
- private final HashMap<Integer, Long> counts;
-
- private RankFunction(HashMap<Integer, Long> counts) {
- this.counts = counts;
- }
-
- @Override
- public Tuple call(Tuple input) throws Exception {
- Tuple output = TupleFactory.getInstance()
- .newTuple(input.getAll().size() - 2);
-
- for (int i = 1; i < input.getAll().size() - 2; i ++) {
- output.set(i, input.get(i+2));
- }
-
- long offset = calculateOffset((Integer) input.get(0));
- output.set(0, offset + (Long)input.get(2));
- return output;
- }
-
- private long calculateOffset(Integer index) {
- long offset = 0;
-
- if (index > 0) {
- for (int i = 0; i < index; i++) {
- if (counts.containsKey(i)) {
- offset += counts.get(i);
- }
- }
- }
- return offset;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+
+public class RankConverter implements RDDConverter<Tuple, Tuple, PORank> {
+
+ private static final Log LOG = LogFactory.getLog(RankConverter.class);
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
+ throws IOException {
+ int parallelism = SparkPigContext.get().getParallelism(predecessors, poRank);
+ SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+ .mapToPair(new ToPairRdd());
+ JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+ .groupByKey(parallelism);
+ JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+ .mapToPair(new IndexCounters());
+ JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+ .sortByKey(true, parallelism);
+ Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+ JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+ .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+ return finalRdd.rdd();
+ }
+
+ @SuppressWarnings("serial")
+ private static class ToPairRdd implements
+ PairFunction<Tuple, Integer, Long>, Serializable {
+
+ @Override
+ public Tuple2<Integer, Long> call(Tuple t) {
+ try {
+ Integer key = (Integer) t.get(0);
+ Long value = (Long) t.get(1);
+ return new Tuple2<Integer, Long>(key, value);
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class IndexCounters implements
+ PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>,
+ Serializable {
+ @Override
+ public Tuple2<Integer, Long> call(Tuple2<Integer,
+ Iterable<Long>> input) {
+ long lastVaue = 0L;
+
+ for (Long t : input._2()) {
+ lastVaue = (t > lastVaue) ? t : lastVaue;
+ }
+
+ return new Tuple2<Integer, Long>(input._1(), lastVaue);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class RankFunction implements Function<Tuple, Tuple>,
+ Serializable {
+ private final HashMap<Integer, Long> counts;
+
+ private RankFunction(HashMap<Integer, Long> counts) {
+ this.counts = counts;
+ }
+
+ @Override
+ public Tuple call(Tuple input) throws Exception {
+ Tuple output = TupleFactory.getInstance()
+ .newTuple(input.getAll().size() - 2);
+
+ for (int i = 1; i < input.getAll().size() - 2; i ++) {
+ output.set(i, input.get(i+2));
+ }
+
+ long offset = calculateOffset((Integer) input.get(0));
+ output.set(0, offset + (Long)input.get(2));
+ return output;
+ }
+
+ private long calculateOffset(Integer index) {
+ long offset = 0;
+
+ if (index > 0) {
+ for (int i = 0; i < index; i++) {
+ if (counts.containsKey(i)) {
+ offset += counts.get(i);
+ }
+ }
+ }
+ return offset;
+ }
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Wed Apr 12 02:20:20 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.data.DataBag;
@@ -54,14 +55,14 @@ public class ReduceByConverter implement
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, op, 1);
- int parallelism = SparkUtil.getParallelism(predecessors, op);
+ int parallelism = SparkPigContext.get().getParallelism(predecessors, op);
RDD<Tuple> rdd = predecessors.get(0);
RDD<Tuple2<IndexedKey, Tuple>> rddPair
- = rdd.map(new LocalRearrangeFunction(op.getLgr(), op.isUseSecondaryKey(), op.getSecondarySortOrder())
+ = rdd.map(new LocalRearrangeFunction(op.getLROp(), op.isUseSecondaryKey(), op.getSecondarySortOrder())
, SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
if (op.isUseSecondaryKey()) {
- return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPkg());
+ return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPKGOp());
} else {
PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
= new PairRDDFunctions<>(rddPair,
@@ -189,8 +190,8 @@ public class ReduceByConverter implement
t.append(key);
t.append(bag);
- poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
- Tuple packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
+ poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ Tuple packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result;
// Perform the operation
LOG.debug("MergeValuesFunction packagedTuple : " + t);
@@ -242,8 +243,8 @@ public class ReduceByConverter implement
bag.add((Tuple) v1._2().get(1));
t.append(key);
t.append(bag);
- poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
- packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
+ poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ packagedTuple = (Tuple) poReduce.getPKGOp().getPkgr().getNext().result;
} catch (ExecException e) {
throw new RuntimeException(e);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Wed Apr 12 02:20:20 2017
@@ -28,6 +28,7 @@ import java.util.HashMap;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.util.Pair;
@@ -84,10 +85,10 @@ public class SkewedJoinConverter impleme
RDD<Tuple> rdd1 = predecessors.get(0);
RDD<Tuple> rdd2 = predecessors.get(1);
- Broadcast<List<Tuple>> keyDist = SparkUtil.getBroadcastedVars().get(skewedJoinPartitionFile);
+ Broadcast<List<Tuple>> keyDist = SparkPigContext.get().getBroadcastedVars().get(skewedJoinPartitionFile);
// if no keyDist, we need defaultParallelism
- Integer defaultParallelism = SparkUtil.getParallelism(predecessors, poSkewedJoin);
+ Integer defaultParallelism = SparkPigContext.get().getParallelism(predecessors, poSkewedJoin);
// with partition id
SkewPartitionIndexKeyFunction skewFun = new SkewPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Wed Apr 12 02:20:20 2017
@@ -28,6 +28,7 @@ import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.JavaPairRDD;
@@ -46,7 +47,7 @@ public class SortConverter implements RD
throws IOException {
SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- int parallelism = SparkUtil.getParallelism(predecessors, sortOperator);
+ int parallelism = SparkPigContext.get().getParallelism(predecessors, sortOperator);
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
SparkUtil.<Tuple, Object> getTuple2Manifest());
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Wed Apr 12 02:20:20 2017
@@ -71,7 +71,7 @@ public class StoreConverter implements
RDD<Tuple> rdd = predecessors.get(0);
SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP,
- SparkStatsUtil.getStoreSparkCounterName(op));
+ SparkStatsUtil.getCounterName(op));
// convert back to KV pairs
JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
@@ -166,7 +166,7 @@ public class StoreConverter implements
if (!op.isTmpStore() && !disableCounter) {
ftf.setDisableCounter(disableCounter);
ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
- ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
+ ftf.setCounterName(SparkStatsUtil.getCounterName(op));
SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
ftf.setSparkCounters(counterReporter.getCounters());
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Apr 12 02:20:20 2017
@@ -31,50 +31,50 @@ import org.apache.spark.api.java.functio
import org.apache.spark.rdd.RDD;
public class StreamConverter implements
- RDDConverter<Tuple, Tuple, POStream> {
+ RDDConverter<Tuple, Tuple, POStream> {
- @Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POStream poStream) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
- RDD<Tuple> rdd = predecessors.get(0);
- StreamFunction streamFunction = new StreamFunction(poStream);
- return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
- }
-
- private static class StreamFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
- private POStream poStream;
-
- private StreamFunction(POStream poStream) {
- this.poStream = poStream;
- }
-
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
- return new Iterable<Tuple>() {
- @Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
-
- @Override
- protected void attach(Tuple tuple) {
- poStream.setInputs(null);
- poStream.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- Result result = poStream.getNextTuple();
- return result;
- }
-
- @Override
- protected void endOfInput() {
- poStream.setFetchable(true);
- }
- };
- }
- };
- }
- }
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POStream poStream) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ StreamFunction streamFunction = new StreamFunction(poStream);
+ return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
+ }
+
+ private static class StreamFunction implements
+ FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ private POStream poStream;
+
+ private StreamFunction(POStream poStream) {
+ this.poStream = poStream;
+ }
+
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new OutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ poStream.setInputs(null);
+ poStream.attachInput(tuple);
+ }
+
+ @Override
+ protected Result getNextResult() throws ExecException {
+ Result result = poStream.getNextTuple();
+ return result;
+ }
+
+ @Override
+ protected void endOfInput() {
+ poStream.setFetchable(true);
+ }
+ };
+ }
+ };
+ }
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java Wed Apr 12 02:20:20 2017
@@ -43,11 +43,11 @@ public class POJoinGroupSpark extends Ph
this.pkgOp = pkgOp;
}
- public List<POLocalRearrange> getLraOps() {
+ public List<POLocalRearrange> getLROps() {
return lraOps;
}
- public POGlobalRearrangeSpark getGlaOp() {
+ public POGlobalRearrangeSpark getGROp() {
return glaOp;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java Wed Apr 12 02:20:20 2017
@@ -32,11 +32,10 @@ import org.apache.pig.impl.plan.VisitorE
public class POPoissonSampleSpark extends POPoissonSample {
private static final Log LOG = LogFactory.getLog(POPoissonSampleSpark.class);
- private static final long serialVersionUID = 1L;
-
-
+ //TODO verify can be removed?
+ //private static final long serialVersionUID = 1L;
// Only for Spark
- private boolean endOfInput = false;
+ private transient boolean endOfInput = false;
public boolean isEndOfInput() {
return endOfInput;
@@ -50,12 +49,6 @@ public class POPoissonSampleSpark extend
super(k, rp, sr, hp, tm);
}
-
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitPoissonSampleSpark(this);
- }
-
@Override
public Result getNextTuple() throws ExecException {
if (!initialized) {
@@ -115,10 +108,12 @@ public class POPoissonSampleSpark extend
Result pickedSample = newSample;
updateSkipInterval((Tuple) pickedSample.result);
- LOG.debug("pickedSample:");
- if(pickedSample.result!=null){
- for(int i=0;i<((Tuple) pickedSample.result).size();i++) {
- LOG.debug("the "+i+" ele:"+((Tuple) pickedSample.result).get(i));
+ if( LOG.isDebugEnabled()) {
+ LOG.debug("pickedSample:");
+ if (pickedSample.result != null) {
+ for (int i = 0; i < ((Tuple) pickedSample.result).size(); i++) {
+ LOG.debug("the " + i + " ele:" + ((Tuple) pickedSample.result).get(i));
+ }
}
}
return pickedSample;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Wed Apr 12 02:20:20 2017
@@ -43,7 +43,7 @@ public class POReduceBySpark extends POF
this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations());
}
- public POPackage getPkg() {
+ public POPackage getPKGOp() {
return pkg;
}
@@ -98,7 +98,7 @@ public class POReduceBySpark extends POF
this.customPartitioner = customPartitioner;
}
- public POLocalRearrange getLgr() {
+ public POLocalRearrange getLROp() {
return lr;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java Wed Apr 12 02:20:20 2017
@@ -17,6 +17,10 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -24,6 +28,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
+import java.util.List;
+
/**
* A visitor to optimize plans that determines if a vertex plan can run in
* accumulative mode.
@@ -31,13 +37,24 @@ import org.apache.pig.impl.plan.VisitorE
public class AccumulatorOptimizer extends SparkOpPlanVisitor {
public AccumulatorOptimizer(SparkOperPlan plan) {
- super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+ super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
}
@Override
public void visitSparkOp(SparkOperator sparkOperator) throws
- VisitorException {
- AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator
- .physicalPlan);
+ VisitorException {
+ PhysicalPlan plan = sparkOperator.physicalPlan;
+ List<PhysicalOperator> pos = plan.getRoots();
+ if (pos == null || pos.size() == 0) {
+ return;
+ }
+
+ List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(plan,
+ POGlobalRearrange.class);
+
+ for (POGlobalRearrange glr : glrs) {
+ List<PhysicalOperator> successors = plan.getSuccessors(glr);
+ AccumulatorOptimizerUtil.addAccumulator(plan, successors);
+ }
}
}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Wed Apr 12 02:20:20 2017
@@ -325,7 +325,7 @@ public class CombinerOptimizer extends S
// Update the ReduceBy Operator with the packaging used by Local rearrange.
private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
- Packager pkgr = reduceOperator.getPkg().getPkgr();
+ Packager pkgr = reduceOperator.getPKGOp().getPkgr();
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Wed Apr 12 02:20:20 2017
@@ -72,7 +72,7 @@ public class JoinGroupOptimizerSpark ext
try {
restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
} catch (PlanException e) {
- throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
+ throw new VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
}
}
}
@@ -90,10 +90,7 @@ public class JoinGroupOptimizerSpark ext
PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If there are POSplit, we need traverse the POSplit.getPlans(), so use mCurrentWalker.getPlan()
if( currentPlan != null) {
plansWithJoinAndGroup.add(currentPlan);
- }else{
- LOG.info("GlobalRearrangeDiscover#currentPlan is null");
}
-
}
public List<PhysicalPlan> getPlansWithJoinAndGroup() {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java Wed Apr 12 02:20:20 2017
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemoverUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -30,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -61,29 +61,11 @@ public class NoopFilterRemover extends S
if (value instanceof Boolean) {
Boolean filterValue = (Boolean) value;
if (filterValue) {
- removeFilter(filter, sparkOp.physicalPlan);
+ NoopFilterRemoverUtil.removeFilter(filter, sparkOp.physicalPlan);
}
}
}
}
}
}
-
- private void removeFilter(POFilter filter, PhysicalPlan plan) {
- if (plan.size() > 1) {
- try {
- List<PhysicalOperator> fInputs = filter.getInputs();
- List<PhysicalOperator> sucs = plan.getSuccessors(filter);
-
- plan.removeAndReconnect(filter);
- if (sucs != null && sucs.size() != 0) {
- for (PhysicalOperator suc : sucs) {
- suc.setInputs(fInputs);
- }
- }
- } catch (PlanException pe) {
- log.info("Couldn't remove a filter in optimizer: " + pe.getMessage());
- }
- }
- }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Wed Apr 12 02:20:20 2017
@@ -64,13 +64,13 @@ public class SecondaryKeyOptimizerSpark
List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class);
if (rearranges.isEmpty()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("No POLocalRearranges found in the sparkOperator.Secondary key optimization is no need");
+ LOG.debug("No POLocalRearranges found in the spark operator" + sparkOperator.getOperatorKey() + ". Skipping secondary key optimization.");
}
return;
}
/**
- * When every POLocalRearrange is encounted in the sparkOperator.physicalPlan,
+ * When ever POLocalRearrange is encountered in the sparkOperator.physicalPlan,
* the sub-physicalplan between the previousLR(or root) to currentLR is considered as mapPlan(like what
* we call in mapreduce) and the sub-physicalplan between the POGlobalRearrange(the successor of currentLR) and
* nextLR(or leaf) is considered as reducePlan(like what we call in mapreduce). After mapPlan and reducePlan are got,
@@ -109,9 +109,8 @@ public class SecondaryKeyOptimizerSpark
// The POLocalRearrange is sub-plan of a POSplit
mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, currentLR.getOperatorKey());
}
-
- SecondaryKeyOptimizerUtil.setIsSparkMode(true);
- SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan, reducePlan);
+ SparkSecondaryKeyOptimizerUtil sparkSecondaryKeyOptUtil = new SparkSecondaryKeyOptimizerUtil();
+ SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = sparkSecondaryKeyOptUtil.applySecondaryKeySort(mapPlan, reducePlan);
if (info != null) {
numSortRemoved += info.getNumSortRemoved();
numDistinctChanged += info.getNumDistinctChanged();
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java?rev=1791060&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java Wed Apr 12 02:20:20 2017
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+
+import java.util.List;
+
+public class SparkSecondaryKeyOptimizerUtil extends SecondaryKeyOptimizerUtil{
+ private static Log log = LogFactory.getLog(SparkSecondaryKeyOptimizerUtil.class);
+
+ @Override
+ protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) {
+ PhysicalOperator currentNode = null;
+
+ if (!(root instanceof POGlobalRearrange)) {
+ log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing");
+ currentNode = null;
+ } else {
+ List<PhysicalOperator> globalRearrangeSuccs = reducePlan
+ .getSuccessors(root);
+ if (globalRearrangeSuccs.size() == 1) {
+ currentNode = globalRearrangeSuccs.get(0);
+ } else {
+ log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing");
+ currentNode = null;
+ }
+ }
+
+ return currentNode;
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java Wed Apr 12 02:20:20 2017
@@ -41,8 +41,8 @@ public class DotSparkPrinter extends Dot
DotSparkPrinter.InnerOperator,
DotSparkPrinter.InnerPlan> {
- static int counter = 0;
- boolean isVerboseNesting = true;
+ private static int counter = 0;
+ private boolean isVerboseNesting = true;
public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) {
this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),