You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/27 21:49:17 UTC

svn commit: r1561841 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine: mapReduceLayer/partitioners/ tez/

Author: rohini
Date: Mon Jan 27 20:49:16 2014
New Revision: 1561841

URL: http://svn.apache.org/r1561841
Log:
PIG-3658: Use Tez ObjectRegistry to cache FRJoin map and WeightedRangePartitioner map (rohini)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Jan 27 20:49:16 2014
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
@@ -58,11 +56,9 @@ import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
                                       implements Configurable {
-    final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
-        = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
 
-    private static final Log log = LogFactory.getLog(WeightedRangePartitioner.class);
-    private PigNullableWritable[] quantiles;
+    protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts;
+    protected PigNullableWritable[] quantiles;
     private RawComparator<PigNullableWritable> comparator;
     private PigContext pigContext;
     private Configuration job;
@@ -94,6 +90,7 @@ public class WeightedRangePartitioner ex
 
     @SuppressWarnings("unchecked")
     public void init() {
+        weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
         try {
             pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
         } catch (IOException e) {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java?rev=1561841&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ObjectCache.java Mon Jan 27 20:49:16 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+@InterfaceAudience.Private
+public class ObjectCache {
+
+    private static final Log LOG = LogFactory.getLog(ObjectCache.class);
+    private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+    private static ObjectCache cache = new ObjectCache();
+
+    private ObjectCache() {
+    }
+
+    public static ObjectCache getInstance() {
+        return cache;
+    }
+
+    public void cache(String key, Object value) {
+      LOG.info("Adding " + key + " to cache");
+      registry.add(ObjectLifeCycle.VERTEX, key, value);
+    }
+
+    public Object retrieve(String key) {
+      Object o = registry.get(key);
+      if (o != null) {
+        LOG.info("Found " + key + " in cache");
+      }
+      return o;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Mon Jan 27 20:49:16 2014
@@ -56,7 +56,6 @@ public class POFRJoinTez extends POFRJoi
     @SuppressWarnings("rawtypes")
     private List<BroadcastKVReader> replReaders = Lists.newArrayList();
     private List<String> inputKeys;
-    private int currIdx = 0;
 
     public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException {
        super(copy);
@@ -88,6 +87,17 @@ public class POFRJoinTez extends POFRJoi
      */
     @Override
     protected void setUpHashMap() throws ExecException {
+        String cacheKey = "replicatemap-" + getOperatorKey().toString();
+
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            replicates = (TupleToMapKey[]) cacheValue;
+            log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+            return;
+        }
+
+        log.info("Building replication hash table");
+
         SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
         SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
 
@@ -107,9 +117,9 @@ public class POFRJoinTez extends POFRJoi
         }
 
         long time1 = System.currentTimeMillis();
-        log.debug("Completed setup. Trying to build replication hash table");
 
         replicates[fragment] = null;
+        int currIdx = 0;
         while (currIdx < replInputs.size()) {
             // We need to adjust the index because the number of replInputs is
             // one less than the number of inputSchemas. The inputSchemas
@@ -118,7 +128,7 @@ public class POFRJoinTez extends POFRJoi
             SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[adjustedIdx];
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[adjustedIdx];
 
-            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
             POLocalRearrange lr = LRs[adjustedIdx];
 
             try {
@@ -155,7 +165,10 @@ public class POFRJoinTez extends POFRJoi
         }
 
         long time2 = System.currentTimeMillis();
-        log.info("Hash Table built. Time taken: " + (time2 - time1));
+        log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+
+        ObjectCache.getInstance().cache(cacheKey, replicates);
+        log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Mon Jan 27 20:49:16 2014
@@ -23,6 +23,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -51,6 +53,8 @@ import com.google.common.collect.Maps;
  */
 public class POPartitionRearrangeTez extends POLocalRearrangeTez {
     private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(POPartitionRearrangeTez.class);
     private static final TupleFactory tf = TupleFactory.getInstance();
     private static final BagFactory mBagFactory = BagFactory.getInstance();
 
@@ -179,7 +183,22 @@ public class POPartitionRearrangeTez ext
         return opBag;
     }
 
+    @SuppressWarnings("unchecked")
     private void init() throws RuntimeException {
+
+        ObjectCache cache = ObjectCache.getInstance();
+        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+        String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers";
+        String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap";
+        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+            totalReducers = (Integer) cache.retrieve(totalReducersCacheKey);
+            reducerMap = (Map<Object, Pair<Integer, Integer>>) cache.retrieve(reducerMapCacheKey);
+            LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey="
+                    + totalReducersCacheKey + "," + reducerMapCacheKey);
+            inited = true;
+            return;
+        }
+
         Map<String, Object> distMap = null;
         if (PigProcessor.sampleMap != null) {
             // We've already collected sampleMap in PigProcessor
@@ -189,47 +208,50 @@ public class POPartitionRearrangeTez ext
                     " used but no key distribution found");
         }
 
-        try {
-            if (distMap != null) {
-                Integer[] totalReducers = new Integer[1];
+        long start = System.currentTimeMillis();
 
-                // The distMap is structured as (key, min, max) where min, max
-                // being the index of the reducers
-                DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
-                totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
-                Iterator<Tuple> it = partitionList.iterator();
-                while (it.hasNext()) {
-                    Tuple idxTuple = it.next();
-                    Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
-                    Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
-                    // Used to replace the maxIndex with the number of reducers
-                    if (maxIndex < minIndex) {
-                        maxIndex = totalReducers[0] + maxIndex;
-                    }
+        try {
+            // The distMap is structured as (key, min, max) where min, max
+            // being the index of the reducers
+            DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+            totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+            Iterator<Tuple> it = partitionList.iterator();
+            while (it.hasNext()) {
+                Tuple idxTuple = it.next();
+                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                // Used to replace the maxIndex with the number of reducers
+                if (maxIndex < minIndex) {
+                    maxIndex = totalReducers + maxIndex;
+                }
 
-                    Tuple keyT;
-                    // if the join is on more than 1 key
-                    if (idxTuple.size() > 3) {
-                        // remove the last 2 fields of the tuple, i.e: minIndex
-                        // and maxIndex and store it in the reducer map
-                        Tuple keyTuple = tf.newTuple();
-                        for (int i=0; i < idxTuple.size() - 2; i++) {
-                            keyTuple.append(idxTuple.get(i));
-                        }
-                        keyT = keyTuple;
-                    } else {
-                        keyT = tf.newTuple(1);
-                        keyT.set(0,idxTuple.get(0));
+                Tuple keyT;
+                // if the join is on more than 1 key
+                if (idxTuple.size() > 3) {
+                    // remove the last 2 fields of the tuple, i.e: minIndex
+                    // and maxIndex and store it in the reducer map
+                    Tuple keyTuple = tf.newTuple();
+                    for (int i=0; i < idxTuple.size() - 2; i++) {
+                        keyTuple.append(idxTuple.get(i));
                     }
-                    // number of reducers
-                    Integer cnt = maxIndex - minIndex;
-                    // 1 is added to account for the 0 index
-                    reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+                    keyT = keyTuple;
+                } else {
+                    keyT = tf.newTuple(1);
+                    keyT.set(0,idxTuple.get(0));
                 }
+                // number of reducers
+                Integer cnt = maxIndex - minIndex;
+                // 1 is added to account for the 0 index
+                reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+
+        LOG.info("Initialized POPartitionRearrangeTez. Time taken: " + (System.currentTimeMillis() - start));
+        cache.cache(isCachedKey, Boolean.TRUE);
+        cache.cache(totalReducersCacheKey, totalReducers);
+        cache.cache(reducerMapCacheKey, reducerMap);
         inited = true;
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Jan 27 20:49:16 2014
@@ -64,11 +64,16 @@ public class PigProcessor implements Log
 
     private Configuration conf;
 
-    public static Map<String, Object> sampleMap = null;
+    public static String sampleVertex;
+    public static Map<String, Object> sampleMap;
 
     @Override
     public void initialize(TezProcessorContext processorContext)
             throws Exception {
+        // Reset any static variables to avoid conflic in container-reuse.
+        sampleVertex = null;
+        sampleMap = null;
+
         byte[] payload = processorContext.getUserPayload();
         conf = TezUtils.createConfFromUserPayload(payload);
         PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
@@ -106,8 +111,9 @@ public class PigProcessor implements Log
 
         initializeOutputs(outputs);
 
-        if (conf.get("pig.sampleVertex") != null) {
-            collectSample((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
+        sampleVertex = conf.get("pig.sampleVertex");
+        if (sampleVertex != null) {
+            collectSample(sampleVertex, inputs.get(sampleVertex));
         }
 
         List<PhysicalOperator> leaves = null;
@@ -201,7 +207,12 @@ public class PigProcessor implements Log
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void collectSample(BroadcastKVReader reader) throws IOException {
+    private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception {
+        Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex);
+        if (cached == Boolean.TRUE) {
+            return;
+        }
+        BroadcastKVReader reader = (BroadcastKVReader) logicalInput.getReader();
         reader.next();
         Object val = reader.getCurrentValue();
         NullableTuple nTup = (NullableTuple) val;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Mon Jan 27 20:49:16 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -28,10 +30,26 @@ import org.apache.pig.impl.builtin.Parti
 import org.apache.pig.impl.util.Pair;
 
 public class SkewedPartitionerTez extends SkewedPartitioner {
+    private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class);
     private static final TupleFactory tf = TupleFactory.getInstance();
 
+    @SuppressWarnings("unchecked")
     @Override
     protected void init() {
+
+        ObjectCache cache = ObjectCache.getInstance();
+        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+        String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers";
+        String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap";
+        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+            totalReducers = (Integer) cache.retrieve(totalReducersCacheKey);
+            reducerMap = (Map<Tuple, Pair<Integer, Integer>>) cache.retrieve(reducerMapCacheKey);
+            LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey="
+                    + totalReducersCacheKey + "," + reducerMapCacheKey);
+            inited = true;
+            return;
+        }
+
         Map<String, Object> distMap = null;
         if (PigProcessor.sampleMap != null) {
             // We've collected sampleMap in PigProcessor
@@ -41,46 +59,49 @@ public class SkewedPartitionerTez extend
                     " used but no key distribution found");
         }
 
-        try {
-            if (distMap != null) {
+        long start = System.currentTimeMillis();
 
-                // The distMap is structured as (key, min, max) where min, max
-                // being the index of the reducers
-                DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
-                totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
-                Iterator<Tuple> it = partitionList.iterator();
-                while (it.hasNext()) {
-                    Tuple idxTuple = it.next();
-                    Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
-                    Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
-                    // Used to replace the maxIndex with the number of reducers
-                    if (maxIndex < minIndex) {
-                        maxIndex = totalReducers + maxIndex;
-                    }
+        try {
+            // The distMap is structured as (key, min, max) where min, max
+            // being the index of the reducers
+            DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+            totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+            Iterator<Tuple> it = partitionList.iterator();
+            while (it.hasNext()) {
+                Tuple idxTuple = it.next();
+                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                // Used to replace the maxIndex with the number of reducers
+                if (maxIndex < minIndex) {
+                    maxIndex = totalReducers + maxIndex;
+                }
 
-                    Tuple keyT;
-                    // if the join is on more than 1 key
-                    if (idxTuple.size() > 3) {
-                        // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
-                        // it in the reducer map
-                        Tuple keyTuple = tf.newTuple();
-                        for (int i=0; i < idxTuple.size() - 2; i++) {
-                            keyTuple.append(idxTuple.get(i));
-                        }
-                        keyT = keyTuple;
-                    } else {
-                        keyT = tf.newTuple(1);
-                        keyT.set(0,idxTuple.get(0));
+                Tuple keyT;
+                // if the join is on more than 1 key
+                if (idxTuple.size() > 3) {
+                    // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+                    // it in the reducer map
+                    Tuple keyTuple = tf.newTuple();
+                    for (int i=0; i < idxTuple.size() - 2; i++) {
+                        keyTuple.append(idxTuple.get(i));
                     }
-                    // number of reducers
-                    Integer cnt = maxIndex - minIndex;
-                    // 1 is added to account for the 0 index
-                    reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+                    keyT = keyTuple;
+                } else {
+                    keyT = tf.newTuple(1);
+                    keyT.set(0,idxTuple.get(0));
                 }
+                // number of reducers
+                Integer cnt = maxIndex - minIndex;
+                // 1 is added to account for the 0 index
+                reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+        cache.cache(isCachedKey, Boolean.TRUE);
+        cache.cache(totalReducersCacheKey, totalReducers);
+        cache.cache(reducerMapCacheKey, reducerMap);
         inited = true;
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jan 27 20:49:16 2014
@@ -727,15 +727,6 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitFRJoin(POFRJoin op) throws VisitorException {
         try {
-            FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
-            for (int i = 0; i < replFiles.length; i++) {
-                if (i == op.getFragment()) {
-                    continue;
-                }
-                replFiles[i] = getTempFileSpec();
-            }
-            op.setReplFiles(replFiles);
-
             List<String> inputKeys = Lists.newArrayList();
             curTezOp = phyToTezOpMap.get(op.getInputs().get(op.getFragment()));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1561841&r1=1561840&r2=1561841&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Mon Jan 27 20:49:16 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -24,17 +25,33 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
-import org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
-    private static final Log log = LogFactory.getLog(WeightedRangePartitionerTez.class);
+    private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class);
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init() {
+        ObjectCache cache = ObjectCache.getInstance();
+        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+        String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles";
+        String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts";
+        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+            quantiles = (PigNullableWritable[]) cache
+                    .retrieve(quantilesCacheKey);
+            weightedParts = (Map<PigNullableWritable, DiscreteProbabilitySampleGenerator>) cache
+                    .retrieve(weightedPartsCacheKey);
+            LOG.info("Found quantiles and weightedParts in Tez cache. cachekey="
+                    + quantilesCacheKey + "," + weightedPartsCacheKey);
+            inited = true;
+            return;
+        }
+
         Map<String, Object> quantileMap = null;
         if (PigProcessor.sampleMap != null) {
             // We've collected sampleMap in PigProcessor
@@ -44,21 +61,26 @@ public class WeightedRangePartitionerTez
                     + " used but no quantiles found");
         }
 
+        long start = System.currentTimeMillis();
         try {
-            if (quantileMap != null) {
-                DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
-                InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
-                convertToArray(quantilesList);
-                for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
-                    Tuple key = (Tuple)ent.getKey(); // sample item which repeats
-                    float[] probVec = getProbVec((Tuple)ent.getValue());
-                    weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(probVec));
-                }
+            weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+            DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+            InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+            convertToArray(quantilesList);
+            for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
+                Tuple key = (Tuple) ent.getKey(); // sample item which repeats
+                float[] probVec = getProbVec((Tuple) ent.getValue());
+                weightedParts.put(getPigNullableWritable(key),
+                        new DiscreteProbabilitySampleGenerator(probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+
+        LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+        cache.cache(isCachedKey, Boolean.TRUE);
+        cache.cache(quantilesCacheKey, quantiles);
+        cache.cache(weightedPartsCacheKey, weightedParts);
         inited = true;
     }
 }