You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/03/13 08:56:02 UTC

tajo git commit: TAJO-1383: Improve broadcast table cache. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 7f0569555 -> e1e38e231


TAJO-1383: Improve broadcast table cache. (jinho)

closes #404


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e1e38e23
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e1e38e23
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e1e38e23

Branch: refs/heads/master
Commit: e1e38e231867e4f6f953a7ec41f5f9d5ad242580
Parents: 7f05695
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Mar 13 16:55:25 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Mar 13 16:55:25 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../engine/planner/physical/HashJoinExec.java   |  51 ++++++--
 .../planner/physical/HashLeftOuterJoinExec.java |  74 ++++++++++-
 .../physical/PartitionMergeScanExec.java        |  17 ++-
 .../tajo/engine/planner/physical/ScanExec.java  |  72 +++++++++++
 .../engine/planner/physical/SeqScanExec.java    | 105 +++-------------
 .../apache/tajo/engine/utils/CacheHolder.java   |  97 +++++++++++++++
 .../apache/tajo/engine/utils/TableCache.java    |  84 +++++++++++++
 .../apache/tajo/engine/utils/TableCacheKey.java |  57 +++++++++
 .../apache/tajo/engine/utils/TupleCache.java    | 122 -------------------
 .../apache/tajo/engine/utils/TupleCacheKey.java |  57 ---------
 .../worker/ExecutionBlockSharedResource.java    |  26 ++++
 .../apache/tajo/worker/TaskAttemptContext.java  |   2 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |   4 +-
 .../apache/tajo/engine/util/TestTableCache.java | 109 +++++++++++++++++
 .../apache/tajo/engine/util/TestTupleCache.java |  89 --------------
 .../plan/serder/LogicalNodeDeserializer.java    |   3 +
 .../tajo/plan/serder/LogicalNodeSerializer.java |   2 +
 tajo-plan/src/main/proto/Plan.proto             |   1 +
 19 files changed, 599 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e6f7917..84a7571 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1383: Improve broadcast table cache. (jinho)
+
     TAJO-1374: Support multi-bytes delimiter for CSV file.
     (Contributed by navis, Committed by jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index d475b78..3bdf2d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -19,15 +19,18 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -58,13 +61,14 @@ public class HashJoinExec extends BinaryPhysicalExec {
   // projection
   protected final Projector projector;
 
+  private TableStats cachedRightTableStats;
+
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
       PhysicalExec rightExec) {
     super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
         leftExec, rightExec);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(100000);
 
     // HashJoin only can manage equi join key pairs.
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
@@ -151,8 +155,41 @@ public class HashJoinExec extends BinaryPhysicalExec {
   }
 
   protected void loadRightToHashTable() throws IOException {
+    ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+    if (scanExec.canBroadcast()) {
+      /* If this table can broadcast, all tasks in a node will share the same cache */
+      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+          context, scanExec.getCanonicalName(), scanExec.getFragments());
+      loadRightFromCache(key);
+    } else {
+      this.tupleSlots = buildRightToHashTable();
+    }
+
+    first = false;
+  }
+
+  protected void loadRightFromCache(TableCacheKey key) throws IOException {
+    ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+    synchronized (sharedResource.getLock()) {
+      if (sharedResource.hasBroadcastCache(key)) {
+        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+        this.tupleSlots = data.getData();
+        this.cachedRightTableStats = data.getTableStats();
+      } else {
+        CacheHolder.BroadcastCacheHolder holder =
+            new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
+        sharedResource.addBroadcastCache(key, holder);
+        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+        this.tupleSlots = data.getData();
+        this.cachedRightTableStats = data.getTableStats();
+      }
+    }
+  }
+
+  private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
+    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
 
     while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
@@ -160,18 +197,18 @@ public class HashJoinExec extends BinaryPhysicalExec {
         keyTuple.put(i, tuple.get(rightKeyList[i]));
       }
 
-      List<Tuple> newValue = tupleSlots.get(keyTuple);
+      List<Tuple> newValue = map.get(keyTuple);
 
       if (newValue != null) {
         newValue.add(tuple);
       } else {
         newValue = new ArrayList<Tuple>();
         newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
+        map.put(keyTuple, newValue);
       }
     }
 
-    first = false;
+    return map;
   }
 
   @Override
@@ -219,7 +256,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
       inputStats.setNumRows(leftInputStats.getNumRows());
     }
 
-    TableStats rightInputStats = rightChild.getInputStats();
+    TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
     if (rightInputStats != null) {
       inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
       inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 233ef92..e78cb20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -22,7 +22,10 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.catalog.SchemaUtil;
@@ -33,6 +36,7 @@ import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -66,6 +70,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   protected Projector projector;
 
   private int rightNumCols;
+  private TableStats cachedRightTableStats;
   private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
 
   public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
@@ -91,8 +96,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
       this.joinFilter = null;
     }
 
-    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
     // HashJoin only can manage equi join key pairs.
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
         rightChild.getSchema(), false);
@@ -201,8 +204,41 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   }
 
   protected void loadRightToHashTable() throws IOException {
+    ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+    if (scanExec.canBroadcast()) {
+      /* If this table can broadcast, all tasks in a node will share the same cache */
+      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+          context, scanExec.getCanonicalName(), scanExec.getFragments());
+      loadRightFromCache(key);
+    } else {
+      this.tupleSlots = buildRightToHashTable();
+    }
+
+    first = false;
+  }
+
+  protected void loadRightFromCache(TableCacheKey key) throws IOException {
+    ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+    synchronized (sharedResource.getLock()) {
+      if (sharedResource.hasBroadcastCache(key)) {
+        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+        this.tupleSlots = data.getData();
+        this.cachedRightTableStats = data.getTableStats();
+      } else {
+        CacheHolder.BroadcastCacheHolder holder =
+            new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
+        sharedResource.addBroadcastCache(key, holder);
+        CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+        this.tupleSlots = data.getData();
+        this.cachedRightTableStats = data.getTableStats();
+      }
+    }
+  }
+
+  private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
+    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
 
     while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
@@ -210,16 +246,18 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
         keyTuple.put(i, tuple.get(rightKeyList[i]));
       }
 
-      List<Tuple> newValue = tupleSlots.get(keyTuple);
+      List<Tuple> newValue = map.get(keyTuple);
+
       if (newValue != null) {
         newValue.add(tuple);
       } else {
         newValue = new ArrayList<Tuple>();
         newValue.add(tuple);
-        tupleSlots.put(keyTuple, newValue);
+        map.put(keyTuple, newValue);
       }
     }
-    first = false;
+
+    return map;
   }
 
   @Override
@@ -250,5 +288,31 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   public JoinNode getPlan() {
     return this.plan;
   }
+
+  @Override
+  public TableStats getInputStats() {
+    if (leftChild == null) {
+      return inputStats;
+    }
+    TableStats leftInputStats = leftChild.getInputStats();
+    inputStats.setNumBytes(0);
+    inputStats.setReadBytes(0);
+    inputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      inputStats.setNumBytes(leftInputStats.getNumBytes());
+      inputStats.setReadBytes(leftInputStats.getReadBytes());
+      inputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
+    if (rightInputStats != null) {
+      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+    }
+
+    return inputStats;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 5692308..a1eaa48 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -34,7 +34,7 @@ import java.util.List;
 /**
  * A Scanner that reads multiple partitions
  */
-public class PartitionMergeScanExec extends PhysicalExec {
+public class PartitionMergeScanExec extends ScanExec {
   private final ScanNode plan;
   private SeqScanExec currentScanner = null;
 
@@ -56,14 +56,16 @@ public class PartitionMergeScanExec extends PhysicalExec {
     inputStats = new TableStats();
   }
 
+  @Override
   public void init() throws IOException {
     for (CatalogProtos.FragmentProto fragment : fragments) {
       SeqScanExec scanExec = new SeqScanExec(context, (ScanNode) PlannerUtil.clone(null, plan),
-          new CatalogProtos.FragmentProto[] {fragment});
+          new CatalogProtos.FragmentProto[]{fragment});
       scanners.add(scanExec);
     }
     progress = 0.0f;
     rescan();
+    super.init();
   }
 
   @Override
@@ -112,11 +114,22 @@ public class PartitionMergeScanExec extends PhysicalExec {
     progress = 1.0f;
   }
 
+  @Override
   public String getTableName() {
     return plan.getTableName();
   }
 
   @Override
+  public String getCanonicalName() {
+    return plan.getCanonicalName();
+  }
+
+  @Override
+  public CatalogProtos.FragmentProto[] getFragments() {
+    return fragments;
+  }
+
+  @Override
   public float getProgress() {
     if (iterator != null) {
       float progressSum = 0.0f;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
new file mode 100644
index 0000000..86874ba
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class ScanExec extends PhysicalExec {
+
+  /* if this is a broadcasted table or not  */
+  private boolean canBroadcast;
+
+  public ScanExec(TaskAttemptContext context, Schema inSchema, Schema outSchema) {
+    super(context, inSchema, outSchema);
+  }
+
+  public abstract String getTableName();
+
+  public abstract String getCanonicalName();
+
+  public abstract CatalogProtos.FragmentProto[] getFragments();
+
+  @Override
+  public void init() throws IOException {
+    canBroadcast = checkIfBroadcast();
+
+    super.init();
+  }
+
+  public boolean canBroadcast() {
+    return canBroadcast;
+  }
+
+  /* check if this scan is broadcasted */
+  private boolean checkIfBroadcast() {
+    Enforcer enforcer = context.getEnforcer();
+
+    if (enforcer != null && enforcer.hasEnforceProperty(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST)) {
+      List<TajoWorkerProtocol.EnforceProperty> properties =
+          enforcer.getEnforceProperties(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST);
+
+      for (TajoWorkerProtocol.EnforceProperty property : properties) {
+        if (getCanonicalName().equals(property.getBroadcast().getTableName())) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c62027d..1078c80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -21,17 +21,14 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.ConstEval;
 import org.apache.tajo.plan.expr.EvalNode;
@@ -42,18 +39,16 @@ import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 
-public class SeqScanExec extends PhysicalExec {
+public class SeqScanExec extends ScanExec {
   private ScanNode plan;
 
   private Scanner scanner = null;
@@ -66,10 +61,6 @@ public class SeqScanExec extends PhysicalExec {
 
   private TableStats inputStats;
 
-  private TupleCacheKey cacheKey;
-
-  private boolean cacheRead = false;
-
   public SeqScanExec(TaskAttemptContext context, ScanNode plan,
                      CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -78,21 +69,6 @@ public class SeqScanExec extends PhysicalExec {
     this.qual = plan.getQual();
     this.fragments = fragments;
 
-    if (plan.isBroadcastTable()) {
-      String pathNameKey = "";
-      if (fragments != null) {
-        StringBuilder stringBuilder = new StringBuilder();
-        for (FragmentProto f : fragments) {
-          Fragment fragement = FragmentConvertor.convert(context.getConf(), f);
-          stringBuilder.append(fragement.getKey());
-        }
-        pathNameKey = stringBuilder.toString();
-      }
-
-      cacheKey = new TupleCacheKey(
-          context.getTaskId().getTaskId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
-    }
-
     if (fragments != null
         && plan.getTableDesc().hasPartition()
         && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
@@ -153,6 +129,7 @@ public class SeqScanExec extends PhysicalExec {
     }
   }
 
+  @Override
   public void init() throws IOException {
     Schema projected;
 
@@ -177,33 +154,7 @@ public class SeqScanExec extends PhysicalExec {
       projected = outSchema;
     }
 
-    if (cacheKey != null) {
-      TupleCache tupleCache = TupleCache.getInstance();
-      if (tupleCache.isBroadcastCacheReady(cacheKey)) {
-        openCacheScanner();
-      } else {
-        if (TupleCache.getInstance().lockBroadcastScan(cacheKey)) {
-          scanAndAddCache(projected);
-          openCacheScanner();
-        } else {
-          Object lockMonitor = tupleCache.getLockMonitor();
-          synchronized (lockMonitor) {
-            try {
-              lockMonitor.wait(20 * 1000);
-            } catch (InterruptedException e) {
-            }
-          }
-          if (tupleCache.isBroadcastCacheReady(cacheKey)) {
-            openCacheScanner();
-          } else {
-            initScanner(projected);
-          }
-        }
-      }
-    } else {
-      initScanner(projected);
-    }
-
+    initScanner(projected);
     super.init();
   }
 
@@ -216,7 +167,7 @@ public class SeqScanExec extends PhysicalExec {
 
   private void initScanner(Schema projected) throws IOException {
     this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
-    TableMeta meta = null;
+    TableMeta meta;
     try {
       meta = (TableMeta) plan.getTableDesc().getMeta().clone();
     } catch (CloneNotSupportedException e) {
@@ -241,35 +192,6 @@ public class SeqScanExec extends PhysicalExec {
     }
   }
 
-  private void openCacheScanner() throws IOException {
-    Scanner cacheScanner = TupleCache.getInstance().openCacheScanner(cacheKey, plan.getPhysicalSchema());
-    if (cacheScanner != null) {
-      scanner = cacheScanner;
-      cacheRead = true;
-    }
-  }
-
-  private void scanAndAddCache(Schema projected) throws IOException {
-    initScanner(projected);
-
-    List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
-    while (!context.isStopped()) {
-      Tuple tuple = next();
-      if (tuple != null) {
-        broadcastTupleCacheList.add(tuple);
-      } else {
-        break;
-      }
-    }
-
-    if (scanner != null) {
-      scanner.close();
-      scanner = null;
-    }
-
-    TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList);
-  }
-
   @Override
   public Tuple next() throws IOException {
     if (fragments == null) {
@@ -281,9 +203,6 @@ public class SeqScanExec extends PhysicalExec {
 
     if (!plan.hasQual()) {
       if ((tuple = scanner.next()) != null) {
-        if (cacheRead) {
-          return tuple;
-        }
         projector.eval(tuple, outTuple);
         outTuple.setOffset(tuple.getOffset());
         return outTuple;
@@ -292,9 +211,6 @@ public class SeqScanExec extends PhysicalExec {
       }
     } else {
       while ((tuple = scanner.next()) != null) {
-        if (cacheRead) {
-          return tuple;
-        }
         if (qual.eval(inSchema, tuple).isTrue()) {
           projector.eval(tuple, outTuple);
           return outTuple;
@@ -328,11 +244,22 @@ public class SeqScanExec extends PhysicalExec {
     projector = null;
   }
 
+  @Override
   public String getTableName() {
     return plan.getTableName();
   }
 
   @Override
+  public String getCanonicalName() {
+    return plan.getCanonicalName();
+  }
+
+  @Override
+  public CatalogProtos.FragmentProto[] getFragments() {
+    return fragments;
+  }
+
+  @Override
   public float getProgress() {
     if (scanner == null) {
       return 1.0f;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
new file mode 100644
index 0000000..6a5c0bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import com.google.common.collect.Maps;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface CacheHolder<T> {
+
+  /**
+   * Get a shared data from the TableCache.
+   */
+  T getData();
+
+  /**
+   * Get a shared table stats from the TableCache.
+   */
+  TableStats getTableStats();
+
+  /**
+   * Release a cache to the memory.
+   *
+   */
+  void release();
+
+  /**
+   * This is a cache-holder for a join table
+   * It will release when execution block is finished
+   */
+  public static class BroadcastCacheHolder implements CacheHolder<Map<Tuple, List<Tuple>>> {
+    private Map<Tuple, List<Tuple>> data;
+    private Deallocatable rowBlock;
+    private TableStats tableStats;
+
+    public BroadcastCacheHolder(Map<Tuple, List<Tuple>> data, TableStats tableStats, Deallocatable rowBlock){
+      this.data = data;
+      this.tableStats = tableStats;
+      this.rowBlock = rowBlock;
+    }
+
+    @Override
+    public Map<Tuple, List<Tuple>> getData() {
+      return Maps.newHashMap(data);
+    }
+
+    @Override
+    public TableStats getTableStats(){
+      return tableStats;
+    }
+
+    @Override
+    public void release() {
+      if(rowBlock != null) rowBlock.release();
+    }
+
+    public static TableCacheKey getCacheKey(TaskAttemptContext ctx, String canonicalName,
+                                                 CatalogProtos.FragmentProto[] fragments) throws IOException {
+      String pathNameKey = "";
+      if (fragments != null) {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (CatalogProtos.FragmentProto f : fragments) {
+          Fragment fragement = FragmentConvertor.convert(ctx.getConf(), f);
+          stringBuilder.append(fragement.getKey());
+        }
+        pathNameKey = stringBuilder.toString();
+      }
+
+      return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), canonicalName, pathNameKey);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
new file mode 100644
index 0000000..f2a2217
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a simple TableCache which just added CacheHolder as needed.
+ */
+ public class TableCache {
+  public static final Log LOG = LogFactory.getLog(TableCache.class);
+
+  private static TableCache instance;
+  private Map<TableCacheKey, CacheHolder<?>> cacheMap = Maps.newHashMap();
+
+  private TableCache() {
+  }
+
+  public static synchronized TableCache getInstance() {
+    if (instance == null) {
+      instance = new TableCache();
+    }
+    return instance;
+  }
+
+  public synchronized void releaseCache(ExecutionBlockId ebId) {
+    if (ebId == null) {
+      return;
+    }
+
+    List<TableCacheKey> keys = getCacheKeyByExecutionBlockId(ebId);
+
+    for (TableCacheKey cacheKey: keys) {
+      cacheMap.remove(cacheKey).release();
+      LOG.info("Removed Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId);
+    }
+  }
+
+  public synchronized List<TableCacheKey> getCacheKeyByExecutionBlockId(ExecutionBlockId ebId) {
+    List<TableCacheKey> keys = Lists.newArrayList();
+    for (TableCacheKey eachKey : cacheMap.keySet()) {
+      if (eachKey.ebId.equals(ebId.toString())) {
+        keys.add(eachKey);
+      }
+    }
+    return keys;
+  }
+
+  public synchronized void addCache(TableCacheKey cacheKey,  CacheHolder<?> cacheData) {
+    cacheMap.put(cacheKey, cacheData);
+    LOG.info("Added Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId);
+  }
+
+  public synchronized boolean hasCache(TableCacheKey cacheKey) {
+    return cacheMap.containsKey(cacheKey);
+  }
+
+  public synchronized CacheHolder<?> getCache(TableCacheKey cacheKey) {
+    return cacheMap.get(cacheKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
new file mode 100644
index 0000000..81a4b58
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+public class TableCacheKey {
+  String ebId;
+  String tableName;
+  String pathName;
+
+  public TableCacheKey(String ebId, String tableName, String pathName) {
+    this.ebId = ebId;
+    this.tableName = tableName;
+    this.pathName = pathName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    return toString().equals(o.toString());
+  }
+
+  @Override
+  public String toString() {
+    return ebId + "," + tableName + "," + pathName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
deleted file mode 100644
index 00647b5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TupleCache {
-  private static TupleCache instance;
-
-  private Map<TupleCacheKey, List<Tuple>> broadcastTupleCacheData
-      = new HashMap<TupleCacheKey, List<Tuple>>();
-  private Map<TupleCacheKey, TupleCacheStatus> broadcastTupleCacheStatus
-      = new HashMap<TupleCacheKey, TupleCacheStatus>();
-
-  private Object lockMonitor = new Object();
-
-  public static enum TupleCacheStatus {
-    STARTED,
-    ENDED
-  };
-
-  private TupleCache() {
-  }
-
-  public static synchronized TupleCache getInstance() {
-    if (instance == null) {
-      instance = new TupleCache();
-    }
-    return instance;
-  }
-
-  public Object getLockMonitor() {
-    return lockMonitor;
-  }
-
-  public void removeBroadcastCache(ExecutionBlockId ebId) {
-    if (ebId == null) {
-      return;
-    }
-    synchronized (lockMonitor) {
-      TupleCacheKey matchedKey = null;
-      for (TupleCacheKey eachKey: broadcastTupleCacheStatus.keySet()) {
-        if (eachKey.ebId.equals(ebId.toString())) {
-          matchedKey = eachKey;
-          break;
-        }
-      }
-      if (matchedKey != null) {
-        broadcastTupleCacheStatus.remove(matchedKey);
-        broadcastTupleCacheData.remove(matchedKey);
-      }
-    }
-  }
-
-  public void addBroadcastCache(TupleCacheKey cacheKey, List<Tuple> cacheData) {
-    synchronized (lockMonitor) {
-      if (broadcastTupleCacheStatus.containsKey(cacheKey) &&
-          broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED) {
-        return;
-      }
-      broadcastTupleCacheData.put(cacheKey, cacheData);
-      broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.ENDED);
-      lockMonitor.notifyAll();
-    }
-  }
-
-  public boolean lockBroadcastScan(TupleCacheKey cacheKey) {
-    synchronized (lockMonitor) {
-      if (broadcastTupleCacheStatus.containsKey(cacheKey)) {
-        return false;
-      } else {
-        broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.STARTED);
-        return true;
-      }
-    }
-  }
-
-  public boolean isBroadcastCacheReady(TupleCacheKey cacheKey) {
-    synchronized (lockMonitor) {
-      if (!broadcastTupleCacheStatus.containsKey(cacheKey)) {
-        return false;
-      }
-      return broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED;
-    }
-  }
-
-  public TupleCacheScanner openCacheScanner(TupleCacheKey cacheKey, Schema schema) throws IOException {
-    synchronized (lockMonitor) {
-      List<Tuple> cacheData = broadcastTupleCacheData.get(cacheKey);
-      if (cacheData != null) {
-        TupleCacheScanner scanner = new TupleCacheScanner(cacheData, schema);
-        scanner.init();
-        return scanner;
-      } else {
-        return null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
deleted file mode 100644
index 1cb01c2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-public class TupleCacheKey {
-  String ebId;
-  String tableName;
-  String pathName;
-
-  public TupleCacheKey(String ebId, String tableName, String pathName) {
-    this.ebId = ebId;
-    this.tableName = tableName;
-    this.pathName = pathName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  @Override
-  public int hashCode() {
-    return toString().hashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    return toString().equals(o.toString());
-  }
-
-  @Override
-  public String toString() {
-    return ebId + "," + tableName + "," + pathName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index b193b24..494fd7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -21,12 +21,16 @@ package org.apache.tajo.worker;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
 import org.apache.tajo.engine.codegen.TajoClassLoader;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCache;
+import org.apache.tajo.engine.utils.TableCacheKey;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
@@ -38,6 +42,7 @@ public class ExecutionBlockSharedResource {
   private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
   private AtomicBoolean initializing = new AtomicBoolean(false);
   private volatile Boolean resourceInitSuccess = Boolean.valueOf(false);
+  private final Object lock = new Object();
 
   // Query
   private QueryContext context;
@@ -108,6 +113,27 @@ public class ExecutionBlockSharedResource {
     }
   }
 
+  /* This is guarantee a lock for a ExecutionBlock */
+  public synchronized Object getLock() {
+    return lock;
+  }
+
+  public boolean hasBroadcastCache(TableCacheKey key) {
+    return TableCache.getInstance().hasCache(key);
+  }
+
+  public <T extends Object> CacheHolder<T> getBroadcastCache(TableCacheKey key) {
+    return (CacheHolder<T>) TableCache.getInstance().getCache(key);
+  }
+
+  public void addBroadcastCache(TableCacheKey cacheKey,  CacheHolder<?> cacheData) {
+    TableCache.getInstance().addCache(cacheKey, cacheData);
+  }
+
+  public void releaseBroadcastCache(ExecutionBlockId id) {
+    TableCache.getInstance().releaseCache(id);
+  }
+
   public void release() {
     compilationContext = null;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 50cd20a..706e9b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 3f4a1b8..a375a31 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.worker.event.TaskRunnerEvent;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.apache.tajo.worker.event.TaskRunnerStopEvent;
@@ -184,7 +182,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
       ExecutionBlockContext executionBlockContext =  executionBlockContextMap.remove(event.getExecutionBlockId());
       if(executionBlockContext != null){
         try {
-          TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
+          executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId());
           executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
           workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
           workerContext.getTaskHistoryWriter().flushTaskHistories();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
new file mode 100644
index 0000000..f10f2a1
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.util;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCache;
+import org.apache.tajo.engine.utils.TableCacheKey;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestTableCache {
+
+  @Test
+  public void testBroadcastTableCache() throws Exception {
+
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
+        QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
+
+    final TableCacheKey key = new TableCacheKey(ebId.toString(), "testBroadcastTableCache", "path");
+    final ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+
+    final int parallelCount = 30;
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+    List<Future<CacheHolder<Long>>> tasks = new ArrayList<Future<CacheHolder<Long>>>();
+    for (int i = 0; i < parallelCount; i++) {
+      tasks.add(executor.submit(createTask(key, resource)));
+    }
+
+    long expected = tasks.get(0).get().getData().longValue();
+
+    for (Future<CacheHolder<Long>> future : tasks) {
+      assertEquals(expected, future.get().getData().longValue());
+    }
+
+    resource.releaseBroadcastCache(ebId);
+    assertFalse(resource.hasBroadcastCache(key));
+    executor.shutdown();
+  }
+
+  private Callable<CacheHolder<Long>> createTask(final TableCacheKey key, final ExecutionBlockSharedResource resource) {
+    return new Callable<CacheHolder<Long>>() {
+      @Override
+      public CacheHolder<Long> call() throws Exception {
+        CacheHolder<Long> result;
+        synchronized (resource.getLock()) {
+          if (!TableCache.getInstance().hasCache(key)) {
+            final long nanoTime = System.nanoTime();
+            final TableStats tableStats = new TableStats();
+            tableStats.setNumRows(100);
+            tableStats.setNumBytes(1000);
+
+            final CacheHolder<Long> cacheHolder = new CacheHolder<Long>() {
+
+              @Override
+              public Long getData() {
+                return nanoTime;
+              }
+
+              @Override
+              public TableStats getTableStats() {
+                return tableStats;
+              }
+
+              @Override
+              public void release() {
+
+              }
+            };
+
+            resource.addBroadcastCache(key, cacheHolder);
+          }
+        }
+
+        CacheHolder<?> holder = resource.getBroadcastCache(key);
+        result = (CacheHolder<Long>) holder;
+        return result;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
deleted file mode 100644
index 3d2f307..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.util;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertFalse;
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
-public class TestTupleCache {
-  @Test
-  public void testTupleCcaheBasicFunction() throws Exception {
-    List<Tuple> tupleData = new ArrayList<Tuple>();
-    for (int i = 0; i < 100; i++) {
-      Datum[] datums = new Datum[5];
-      for (int j = 0; j < 5; j++) {
-        datums[j] = new TextDatum(i + "_" + j);
-      }
-      Tuple tuple = new VTuple(datums);
-      tupleData.add(tuple);
-    }
-
-    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
-        QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
-
-    TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable", "test");
-    TupleCache tupleCache = TupleCache.getInstance();
-
-    assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
-    assertTrue(tupleCache.lockBroadcastScan(cacheKey));
-    assertFalse(tupleCache.lockBroadcastScan(cacheKey));
-
-    tupleCache.addBroadcastCache(cacheKey, tupleData);
-    assertTrue(tupleCache.isBroadcastCacheReady(cacheKey));
-
-    Scanner scanner = tupleCache.openCacheScanner(cacheKey, null);
-    assertNotNull(scanner);
-
-    int count = 0;
-
-    while (true) {
-      Tuple tuple = scanner.next();
-      if (tuple == null) {
-        break;
-      }
-
-      assertEquals(tupleData.get(count), tuple);
-      count++;
-    }
-
-    assertEquals(tupleData.size(), count);
-
-    tupleCache.removeBroadcastCache(ebId);
-    assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
-    assertTrue(tupleCache.lockBroadcastScan(cacheKey));
-
-    tupleCache.removeBroadcastCache(ebId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 5cbed7e..3387157 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -416,6 +416,9 @@ public class LogicalNodeDeserializer {
       scan.setQual(EvalNodeDeserializer.deserialize(context, scanProto.getQual()));
     }
 
+    if(scanProto.hasBroadcast()){
+      scan.setBroadcastTable(scanProto.getBroadcast());
+    }
     scan.setInSchema(convertSchema(protoNode.getInSchema()));
     scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index 39a13ba..1bde955 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -431,6 +431,8 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     if (scan.hasQual()) {
       scanBuilder.setQual(EvalNodeSerializer.serialize(scan.getQual()));
     }
+
+    scanBuilder.setBroadcast(scan.isBroadcastTable());
     return scanBuilder;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 3e4f07c..02f52ff 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -104,6 +104,7 @@ message ScanNode {
   required bool existTargets = 3;
   repeated Target targets = 4;
   optional EvalNodeTree qual = 5;
+  optional bool broadcast = 6;
 }
 
 message PartitionScanSpec {