You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/27 21:49:17 UTC
svn commit: r1561841 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine:
mapReduceLayer/partitioners/ tez/
Author: rohini
Date: Mon Jan 27 20:49:16 2014
New Revision: 1561841
URL: http://svn.apache.org/r1561841
Log:
PIG-3658: Use Tez ObjectRegistry to cache FRJoin map and WeightedRangePartitioner map (rohini)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Jan 27 20:49:16 2014
@@ -24,8 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
@@ -58,11 +56,9 @@ import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
- final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
- = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
- private static final Log log = LogFactory.getLog(WeightedRangePartitioner.class);
- private PigNullableWritable[] quantiles;
+ protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts;
+ protected PigNullableWritable[] quantiles;
private RawComparator<PigNullableWritable> comparator;
private PigContext pigContext;
private Configuration job;
@@ -94,6 +90,7 @@ public class WeightedRangePartitioner ex
@SuppressWarnings("unchecked")
public void init() {
+ weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
try {
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
} catch (IOException e) {
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java?rev=1561841&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java Mon Jan 27 20:49:16 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+@InterfaceAudience.Private
+public class ObjectCache {
+
+ private static final Log LOG = LogFactory.getLog(ObjectCache.class);
+ private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+ private static ObjectCache cache = new ObjectCache();
+
+ private ObjectCache() {
+ }
+
+ public static ObjectCache getInstance() {
+ return cache;
+ }
+
+ public void cache(String key, Object value) {
+ LOG.info("Adding " + key + " to cache");
+ registry.add(ObjectLifeCycle.VERTEX, key, value);
+ }
+
+ public Object retrieve(String key) {
+ Object o = registry.get(key);
+ if (o != null) {
+ LOG.info("Found " + key + " in cache");
+ }
+ return o;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Mon Jan 27 20:49:16 2014
@@ -56,7 +56,6 @@ public class POFRJoinTez extends POFRJoi
@SuppressWarnings("rawtypes")
private List<BroadcastKVReader> replReaders = Lists.newArrayList();
private List<String> inputKeys;
- private int currIdx = 0;
public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException {
super(copy);
@@ -88,6 +87,17 @@ public class POFRJoinTez extends POFRJoi
*/
@Override
protected void setUpHashMap() throws ExecException {
+ String cacheKey = "replicatemap-" + getOperatorKey().toString();
+
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ replicates = (TupleToMapKey[]) cacheValue;
+ log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+ return;
+ }
+
+ log.info("Building replication hash table");
+
SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
@@ -107,9 +117,9 @@ public class POFRJoinTez extends POFRJoi
}
long time1 = System.currentTimeMillis();
- log.debug("Completed setup. Trying to build replication hash table");
replicates[fragment] = null;
+ int currIdx = 0;
while (currIdx < replInputs.size()) {
// We need to adjust the index because the number of replInputs is
// one less than the number of inputSchemas. The inputSchemas
@@ -118,7 +128,7 @@ public class POFRJoinTez extends POFRJoi
SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[adjustedIdx];
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[adjustedIdx];
- TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+ TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
POLocalRearrange lr = LRs[adjustedIdx];
try {
@@ -155,7 +165,10 @@ public class POFRJoinTez extends POFRJoi
}
long time2 = System.currentTimeMillis();
- log.info("Hash Table built. Time taken: " + (time2 - time1));
+ log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+
+ ObjectCache.getInstance().cache(cacheKey, replicates);
+ log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Mon Jan 27 20:49:16 2014
@@ -23,6 +23,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -51,6 +53,8 @@ import com.google.common.collect.Maps;
*/
public class POPartitionRearrangeTez extends POLocalRearrangeTez {
private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(POPartitionRearrangeTez.class);
private static final TupleFactory tf = TupleFactory.getInstance();
private static final BagFactory mBagFactory = BagFactory.getInstance();
@@ -179,7 +183,22 @@ public class POPartitionRearrangeTez ext
return opBag;
}
+ @SuppressWarnings("unchecked")
private void init() throws RuntimeException {
+
+ ObjectCache cache = ObjectCache.getInstance();
+ String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+ String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers";
+ String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap";
+ if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+ totalReducers = (Integer) cache.retrieve(totalReducersCacheKey);
+ reducerMap = (Map<Object, Pair<Integer, Integer>>) cache.retrieve(reducerMapCacheKey);
+ LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey="
+ + totalReducersCacheKey + "," + reducerMapCacheKey);
+ inited = true;
+ return;
+ }
+
Map<String, Object> distMap = null;
if (PigProcessor.sampleMap != null) {
// We've already collected sampleMap in PigProcessor
@@ -189,47 +208,50 @@ public class POPartitionRearrangeTez ext
" used but no key distribution found");
}
- try {
- if (distMap != null) {
- Integer[] totalReducers = new Integer[1];
+ long start = System.currentTimeMillis();
- // The distMap is structured as (key, min, max) where min, max
- // being the index of the reducers
- DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
- totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
- Iterator<Tuple> it = partitionList.iterator();
- while (it.hasNext()) {
- Tuple idxTuple = it.next();
- Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
- Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
- // Used to replace the maxIndex with the number of reducers
- if (maxIndex < minIndex) {
- maxIndex = totalReducers[0] + maxIndex;
- }
+ try {
+ // The distMap is structured as (key, min, max) where min, max
+ // being the index of the reducers
+ DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers + maxIndex;
+ }
- Tuple keyT;
- // if the join is on more than 1 key
- if (idxTuple.size() > 3) {
- // remove the last 2 fields of the tuple, i.e: minIndex
- // and maxIndex and store it in the reducer map
- Tuple keyTuple = tf.newTuple();
- for (int i=0; i < idxTuple.size() - 2; i++) {
- keyTuple.append(idxTuple.get(i));
- }
- keyT = keyTuple;
- } else {
- keyT = tf.newTuple(1);
- keyT.set(0,idxTuple.get(0));
+ Tuple keyT;
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex
+ // and maxIndex and store it in the reducer map
+ Tuple keyTuple = tf.newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
}
- // number of reducers
- Integer cnt = maxIndex - minIndex;
- // 1 is added to account for the 0 index
- reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+ keyT = keyTuple;
+ } else {
+ keyT = tf.newTuple(1);
+ keyT.set(0,idxTuple.get(0));
}
+ // number of reducers
+ Integer cnt = maxIndex - minIndex;
+ // 1 is added to account for the 0 index
+ reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ LOG.info("Initialized POPartitionRearrangeTez. Time taken: " + (System.currentTimeMillis() - start));
+ cache.cache(isCachedKey, Boolean.TRUE);
+ cache.cache(totalReducersCacheKey, totalReducers);
+ cache.cache(reducerMapCacheKey, reducerMap);
inited = true;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Jan 27 20:49:16 2014
@@ -64,11 +64,16 @@ public class PigProcessor implements Log
private Configuration conf;
- public static Map<String, Object> sampleMap = null;
+ public static String sampleVertex;
+ public static Map<String, Object> sampleMap;
@Override
public void initialize(TezProcessorContext processorContext)
throws Exception {
+ // Reset any static variables to avoid conflic in container-reuse.
+ sampleVertex = null;
+ sampleMap = null;
+
byte[] payload = processorContext.getUserPayload();
conf = TezUtils.createConfFromUserPayload(payload);
PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
@@ -106,8 +111,9 @@ public class PigProcessor implements Log
initializeOutputs(outputs);
- if (conf.get("pig.sampleVertex") != null) {
- collectSample((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
+ sampleVertex = conf.get("pig.sampleVertex");
+ if (sampleVertex != null) {
+ collectSample(sampleVertex, inputs.get(sampleVertex));
}
List<PhysicalOperator> leaves = null;
@@ -201,7 +207,12 @@ public class PigProcessor implements Log
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void collectSample(BroadcastKVReader reader) throws IOException {
+ private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception {
+ Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex);
+ if (cached == Boolean.TRUE) {
+ return;
+ }
+ BroadcastKVReader reader = (BroadcastKVReader) logicalInput.getReader();
reader.next();
Object val = reader.getCurrentValue();
NullableTuple nTup = (NullableTuple) val;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Mon Jan 27 20:49:16 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -28,10 +30,26 @@ import org.apache.pig.impl.builtin.Parti
import org.apache.pig.impl.util.Pair;
public class SkewedPartitionerTez extends SkewedPartitioner {
+ private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class);
private static final TupleFactory tf = TupleFactory.getInstance();
+ @SuppressWarnings("unchecked")
@Override
protected void init() {
+
+ ObjectCache cache = ObjectCache.getInstance();
+ String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+ String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers";
+ String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap";
+ if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+ totalReducers = (Integer) cache.retrieve(totalReducersCacheKey);
+ reducerMap = (Map<Tuple, Pair<Integer, Integer>>) cache.retrieve(reducerMapCacheKey);
+ LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey="
+ + totalReducersCacheKey + "," + reducerMapCacheKey);
+ inited = true;
+ return;
+ }
+
Map<String, Object> distMap = null;
if (PigProcessor.sampleMap != null) {
// We've collected sampleMap in PigProcessor
@@ -41,46 +59,49 @@ public class SkewedPartitionerTez extend
" used but no key distribution found");
}
- try {
- if (distMap != null) {
+ long start = System.currentTimeMillis();
- // The distMap is structured as (key, min, max) where min, max
- // being the index of the reducers
- DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
- totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
- Iterator<Tuple> it = partitionList.iterator();
- while (it.hasNext()) {
- Tuple idxTuple = it.next();
- Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
- Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
- // Used to replace the maxIndex with the number of reducers
- if (maxIndex < minIndex) {
- maxIndex = totalReducers + maxIndex;
- }
+ try {
+ // The distMap is structured as (key, min, max) where min, max
+ // being the index of the reducers
+ DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers + maxIndex;
+ }
- Tuple keyT;
- // if the join is on more than 1 key
- if (idxTuple.size() > 3) {
- // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
- // it in the reducer map
- Tuple keyTuple = tf.newTuple();
- for (int i=0; i < idxTuple.size() - 2; i++) {
- keyTuple.append(idxTuple.get(i));
- }
- keyT = keyTuple;
- } else {
- keyT = tf.newTuple(1);
- keyT.set(0,idxTuple.get(0));
+ Tuple keyT;
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+ // it in the reducer map
+ Tuple keyTuple = tf.newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
}
- // number of reducers
- Integer cnt = maxIndex - minIndex;
- // 1 is added to account for the 0 index
- reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+ keyT = keyTuple;
+ } else {
+ keyT = tf.newTuple(1);
+ keyT.set(0,idxTuple.get(0));
}
+ // number of reducers
+ Integer cnt = maxIndex - minIndex;
+ // 1 is added to account for the 0 index
+ reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
+ LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+ cache.cache(isCachedKey, Boolean.TRUE);
+ cache.cache(totalReducersCacheKey, totalReducers);
+ cache.cache(reducerMapCacheKey, reducerMap);
inited = true;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jan 27 20:49:16 2014
@@ -727,15 +727,6 @@ public class TezCompiler extends PhyPlan
@Override
public void visitFRJoin(POFRJoin op) throws VisitorException {
try {
- FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
- for (int i = 0; i < replFiles.length; i++) {
- if (i == op.getFragment()) {
- continue;
- }
- replFiles[i] = getTempFileSpec();
- }
- op.setReplFiles(replFiles);
-
List<String> inputKeys = Lists.newArrayList();
curTezOp = phyToTezOpMap.get(op.getInputs().get(op.getFragment()));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Mon Jan 27 20:49:16 2014
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -24,17 +25,33 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
-import org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.io.PigNullableWritable;
public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
- private static final Log log = LogFactory.getLog(WeightedRangePartitionerTez.class);
+ private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class);
+ @SuppressWarnings("unchecked")
@Override
public void init() {
+ ObjectCache cache = ObjectCache.getInstance();
+ String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+ String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles";
+ String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts";
+ if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+ quantiles = (PigNullableWritable[]) cache
+ .retrieve(quantilesCacheKey);
+ weightedParts = (Map<PigNullableWritable, DiscreteProbabilitySampleGenerator>) cache
+ .retrieve(weightedPartsCacheKey);
+ LOG.info("Found quantiles and weightedParts in Tez cache. cachekey="
+ + quantilesCacheKey + "," + weightedPartsCacheKey);
+ inited = true;
+ return;
+ }
+
Map<String, Object> quantileMap = null;
if (PigProcessor.sampleMap != null) {
// We've collected sampleMap in PigProcessor
@@ -44,21 +61,26 @@ public class WeightedRangePartitionerTez
+ " used but no quantiles found");
}
+ long start = System.currentTimeMillis();
try {
- if (quantileMap != null) {
- DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
- InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
- convertToArray(quantilesList);
- for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
- Tuple key = (Tuple)ent.getKey(); // sample item which repeats
- float[] probVec = getProbVec((Tuple)ent.getValue());
- weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(probVec));
- }
+ weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+ DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+ convertToArray(quantilesList);
+ for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
+ Tuple key = (Tuple) ent.getKey(); // sample item which repeats
+ float[] probVec = getProbVec((Tuple) ent.getValue());
+ weightedParts.put(getPigNullableWritable(key),
+ new DiscreteProbabilitySampleGenerator(probVec));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+ cache.cache(isCachedKey, Boolean.TRUE);
+ cache.cache(quantilesCacheKey, quantiles);
+ cache.cache(weightedPartsCacheKey, weightedParts);
inited = true;
}
}