You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/04/05 01:23:47 UTC

[3/4] TAJO-725: Broadcast JOIN should supports multiple tables. (hyoungjunkim via jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index b3aefbe..63b50ac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -665,7 +665,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
       subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery);
       subQuery.taskScheduler.init(conf);
-      LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling");
+      LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling for " + subQuery.getId());
     }
 
     /**
@@ -716,31 +716,32 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       if (parent != null && parent.getScanNodes().length == 2) {
         List<ExecutionBlock> childs = masterPlan.getChilds(parent);
 
-        // for inner
+        // for outer
         ExecutionBlock outer = childs.get(0);
         long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
 
         // for inner
         ExecutionBlock inner = childs.get(1);
         long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
-        LOG.info("Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+        LOG.info(subQuery.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
             + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
 
         long bigger = Math.max(outerVolume, innerVolume);
 
         int mb = (int) Math.ceil((double) bigger / 1048576);
-        LOG.info("Bigger Table's volume is approximately " + mb + " MB");
+        LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
 
         int taskNum = (int) Math.ceil((double) mb /
             conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
 
         int totalMem = getClusterTotalMemory(subQuery);
-        LOG.info("Total memory of cluster is " + totalMem + " MB");
+        LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
         int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
 
         // determine the number of task
         taskNum = Math.min(taskNum, slots);
-        LOG.info("The determined number of join partitions is " + taskNum);
+        LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
+
         return taskNum;
 
         // Is this subquery the first step of group-by?
@@ -752,17 +753,17 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
 
           int mb = (int) Math.ceil((double) volume / 1048576);
-          LOG.info("Table's volume is approximately " + mb + " MB");
+          LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
           // determine the number of task
           int taskNumBySize = (int) Math.ceil((double) mb /
               conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
 
           int totalMem = getClusterTotalMemory(subQuery);
 
-          LOG.info("Total memory of cluster is " + totalMem + " MB");
+          LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
           int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
           int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions
-          LOG.info("The determined number of aggregation partitions is " + taskNum);
+          LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum);
           return taskNum;
         }
       } else {
@@ -770,10 +771,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
 
         int mb = (int) Math.ceil((double)volume / 1048576);
-        LOG.info("Table's volume is approximately " + mb + " MB");
+        LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
         // determine the number of task per 128MB
         int taskNum = (int) Math.ceil((double)mb / 128);
-        LOG.info("The determined number of partitions is " + taskNum);
+        LOG.info(subQuery.getId() + ", The determined number of partitions is " + taskNum);
         return taskNum;
       }
     }
@@ -813,9 +814,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
                                       ExecutionBlock execBlock) {
       Map<String, TableDesc> tableMap = context.getTableDescMap();
       if (masterPlan.isLeaf(execBlock)) {
-        ScanNode outerScan = execBlock.getScanNodes()[0];
-        TableStats stat = tableMap.get(outerScan.getCanonicalName()).getStats();
-        return stat.getNumBytes();
+        ScanNode[] outerScans = execBlock.getScanNodes();
+        long maxVolume = 0;
+        for (ScanNode eachScanNode: outerScans) {
+          TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+          if (stat.getNumBytes() > maxVolume) {
+            maxVolume = stat.getNumBytes();
+          }
+        }
+        return maxVolume;
       } else {
         long aggregatedVolume = 0;
         for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
@@ -895,28 +902,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
+  public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), fragment));
+  }
+
+
   public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
     for (FileFragment eachFragment : fragments) {
       scheduleFragment(subQuery, eachFragment);
     }
   }
 
-  public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
-    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
-        subQuery.getId(), fragment));
-  }
-
   public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
-                                       FileFragment broadcastFragment) {
+                                       Collection<FileFragment> broadcastFragments) {
     for (FileFragment eachLeafFragment : leftFragments) {
-      scheduleFragment(subQuery, eachLeafFragment, broadcastFragment);
+      scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
     }
   }
 
   public static void scheduleFragment(SubQuery subQuery,
-                                      FileFragment leftFragment, FileFragment rightFragment) {
+                                      FileFragment leftFragment, Collection<FileFragment> rightFragments) {
     subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
-        subQuery.getId(), leftFragment, rightFragment));
+        subQuery.getId(), leftFragment, rightFragments));
   }
 
   public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
@@ -1007,24 +1015,26 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.completedTaskCount++;
 
         if (taskEvent.getState() == TaskState.SUCCEEDED) {
-          if (task.isLeafTask()) {
-            subQuery.succeededObjectCount += task.getTotalFragmentNum();
-          } else {
-            subQuery.succeededObjectCount++;
-          }
+//          if (task.isLeafTask()) {
+//            subQuery.succeededObjectCount += task.getTotalFragmentNum();
+//          } else {
+//            subQuery.succeededObjectCount++;
+//          }
+          subQuery.succeededObjectCount++;
         } else if (task.getState() == TaskState.KILLED) {
-          if (task.isLeafTask()) {
-            subQuery.killedObjectCount += task.getTotalFragmentNum();
-          } else {
-            subQuery.killedObjectCount++;
-          }
+//          if (task.isLeafTask()) {
+//            subQuery.killedObjectCount += task.getTotalFragmentNum();
+//          } else {
+//            subQuery.killedObjectCount++;
+//          }
+          subQuery.killedObjectCount++;
         } else if (task.getState() == TaskState.FAILED) {
-          if (task.isLeafTask()) {
-            subQuery.failedObjectCount+= task.getTotalFragmentNum();
-          } else {
-            subQuery.failedObjectCount++;
-          }
-
+//          if (task.isLeafTask()) {
+//            subQuery.failedObjectCount+= task.getTotalFragmentNum();
+//          } else {
+//            subQuery.failedObjectCount++;
+//          }
+          subQuery.failedObjectCount++;
           // if at least one task is failed, try to kill all tasks.
           subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 92762c9..6f3281c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -232,6 +232,10 @@ public class TaskAttemptContext {
   }
 
   public FragmentProto getTable(String id) {
+    if (fragmentMap.get(id) == null) {
+      //for empty table
+      return null;
+    }
     return fragmentMap.get(id).get(0);
   }
 
@@ -244,6 +248,10 @@ public class TaskAttemptContext {
   }
   
   public FragmentProto [] getTables(String id) {
+    if (fragmentMap.get(id) == null) {
+      //for empty table
+      return null;
+    }
     return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index d74e0df..9e904cd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -37,7 +37,9 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.rpc.CallFuture;
@@ -219,10 +221,8 @@ public class TaskRunner extends AbstractService {
     tasks.clear();
     fetchLauncher.shutdown();
     this.queryEngine = null;
-//    if(client != null) {
-//      client.close();
-//      client = null;
-//    }
+
+    TupleCache.getInstance().removeBroadcastCache(executionBlockId);
 
     LOG.info("Stop TaskRunner: " + executionBlockId);
     synchronized (this) {
@@ -243,10 +243,6 @@ public class TaskRunner extends AbstractService {
       return nodeId.toString();
     }
 
-//    public TajoWorkerProtocolService.Interface getMaster() {
-//      return master;
-//    }
-
     public FileSystem getLocalFS() {
       return localFS;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index e3a356d..2e64f15 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -42,6 +42,9 @@
   Map<String, Integer> portMap = new HashMap<String, Integer>();
 
   Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters();
+  if (queryMasters == null || queryMasters.isEmpty()) {
+    queryMasters = master.getContext().getResourceManager().getWorkers().keySet();
+  }
   for(String eachQueryMasterKey: queryMasters) {
     WorkerResource queryMaster = workers.get(eachQueryMasterKey);
     if(queryMaster != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 784fea5..e3ca39b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -206,6 +206,7 @@ public class QueryTestCaseBase {
     } catch (ServiceException e) {
       e.printStackTrace();
     }
+    testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "false");
   }
 
   protected TajoClient getClient() {
@@ -433,7 +434,8 @@ public class QueryTestCaseBase {
       if (expr.getType() == OpType.CreateTable) {
         CreateTable createTable = (CreateTable) expr;
         String tableName = createTable.getTableName();
-        assertTrue("Table creation is failed.", client.updateQuery(parsedResult.getStatement()));
+        assertTrue("Table [" + tableName + "] creation is failed.", client.updateQuery(parsedResult.getStatement()));
+
         TableDesc createdTable = client.getTableDesc(tableName);
         String createdTableName = createdTable.getName();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 10e46e0..b58116a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -619,4 +619,11 @@ public class TajoTestingCluster {
       Closeables.closeQuietly(writer);
     }
   }
+
+  public void setAllTajoDaemonConfValue(String key, String value) {
+    tajoMaster.getContext().getConf().set(key, value);
+    for (TajoWorker eachWorker: tajoWorkers) {
+      eachWorker.getConfig().set(key, value);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 5eb8af2..a2e3181 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -40,7 +40,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.sql.ResultSet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
new file mode 100644
index 0000000..b56ab47
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBroadcastJoinPlan {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestBroadcastJoinPlan";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private Path testDir;
+
+  private TableDesc smallTable1;
+  private TableDesc smallTable2;
+  private TableDesc smallTable3;
+  private TableDesc largeTable1;
+  private TableDesc largeTable2;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    conf = util.getConfiguration();
+    conf.setLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD, 500 * 1024);
+    conf.setBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO, true);
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog = util.startCatalogCluster().getCatalog();
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    util.getMiniCatalogCluster().getCatalogServer().reloadBuiltinFunctions(TajoMaster.initBuiltinFunctions());
+
+    Schema smallTable1Schema = new Schema();
+    smallTable1Schema.addColumn("small1_id", TajoDataTypes.Type.INT4);
+    smallTable1Schema.addColumn("small1_contents", TajoDataTypes.Type.TEXT);
+    smallTable1 = makeTestData("default.small1", smallTable1Schema, 10 * 1024);
+
+    Schema smallTable2Schema = new Schema();
+    smallTable2Schema.addColumn("small2_id", TajoDataTypes.Type.INT4);
+    smallTable2Schema.addColumn("small2_contents", TajoDataTypes.Type.TEXT);
+    smallTable2 = makeTestData("default.small2", smallTable2Schema, 10 * 1024);
+
+    Schema smallTable3Schema = new Schema();
+    smallTable3Schema.addColumn("small3_id", TajoDataTypes.Type.INT4);
+    smallTable3Schema.addColumn("small3_contents", TajoDataTypes.Type.TEXT);
+    smallTable3 = makeTestData("default.small3", smallTable3Schema, 10 * 1024);
+
+    Schema largeTable1Schema = new Schema();
+    largeTable1Schema.addColumn("large1_id", TajoDataTypes.Type.INT4);
+    largeTable1Schema.addColumn("large1_contents", TajoDataTypes.Type.TEXT);
+    largeTable1 = makeTestData("default.large1", largeTable1Schema, 1024 * 1024);  //1M
+
+    Schema largeTable2Schema = new Schema();
+    largeTable2Schema.addColumn("large2_id", TajoDataTypes.Type.INT4);
+    largeTable2Schema.addColumn("large2_contents", TajoDataTypes.Type.TEXT);
+    largeTable2 = makeTestData("default.large2", largeTable2Schema, 1024 * 1024);  //1M
+
+    catalog.createTable(smallTable1);
+    catalog.createTable(smallTable2);
+    catalog.createTable(largeTable1);
+    catalog.createTable(largeTable2);
+
+    analyzer = new SQLAnalyzer();
+  }
+
+  private TableDesc makeTestData(String tableName, Schema schema, int dataSize) throws Exception {
+    TableMeta tableMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+    Path dataPath = new Path(testDir, tableName + ".csv");
+
+    String contentsData = "";
+    for (int i = 0; i < 1000; i++) {
+      for (int j = 0; j < 10; j++) {
+        contentsData += j;
+      }
+    }
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema,
+        dataPath);
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    int writtenSize = 0;
+    int count = 0;
+    while (true) {
+      TextDatum textDatum = DatumFactory.createText(count + "_" + contentsData);
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(count), textDatum });
+      appender.addTuple(tuple);
+
+      writtenSize += textDatum.size();
+      if (writtenSize >= dataSize) {
+        break;
+      }
+    }
+
+    appender.flush();
+    appender.close();
+
+    TableDesc tableDesc = CatalogUtil.newTableDesc(tableName, schema, tableMeta, dataPath);
+    TableStats tableStats = new TableStats();
+    FileSystem fs = dataPath.getFileSystem(conf);
+    tableStats.setNumBytes(fs.getFileStatus(dataPath).getLen());
+
+    tableDesc.setStats(tableStats);
+
+    return tableDesc;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public final void testBroadcastJoin() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join small1 on large1_id = small1_id " +
+        "join small2 on small1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395714781593_0000_000007 (TERMINAL)
+        |-eb_1395714781593_0000_000006 (ROOT)
+            |-eb_1395714781593_0000_000005 (LEAF)
+    */
+
+    ExecutionBlock terminalEB = masterPlan.getRoot();
+    assertEquals(1, masterPlan.getChildCount(terminalEB.getId()));
+
+    ExecutionBlock rootEB = masterPlan.getChild(terminalEB.getId(), 0);
+    assertEquals(1, masterPlan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock leafEB = masterPlan.getChild(rootEB.getId(), 0);
+    assertNotNull(leafEB);
+
+    assertEquals(0, masterPlan.getChildCount(leafEB.getId()));
+    Collection<String> broadcastTables = leafEB.getBroadcastTables();
+    assertEquals(2, broadcastTables.size());
+
+    assertTrue(broadcastTables.contains("default.small1"));
+    assertTrue(broadcastTables.contains("default.small2"));
+    assertTrue(!broadcastTables.contains("default.large1"));
+
+    LogicalNode leafNode = leafEB.getPlan();
+    assertEquals(NodeType.GROUP_BY, leafNode.getType());
+
+    LogicalNode joinNode = ((GroupbyNode)leafNode).getChild();
+    assertEquals(NodeType.JOIN, joinNode.getType());
+
+    LogicalNode leftNode = ((JoinNode)joinNode).getLeftChild();
+    LogicalNode rightNode = ((JoinNode)joinNode).getRightChild();
+
+    assertEquals(NodeType.JOIN, leftNode.getType());
+    assertEquals(NodeType.SCAN, rightNode.getType());
+
+    LogicalNode lastLeftNode = ((JoinNode)leftNode).getLeftChild();
+    LogicalNode lastRightNode = ((JoinNode)leftNode).getRightChild();
+
+    assertEquals(NodeType.SCAN, lastLeftNode.getType());
+    assertEquals(NodeType.SCAN, lastRightNode.getType());
+  }
+
+  @Test
+  public final void testNotBroadcastJoinTwoLargeTable() throws IOException, PlanningException {
+    // This query is not broadcast join
+    String query = "select count(*) from large1 " +
+        "join large2 on large1_id = large2_id ";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      Collection<String> broadcastTables = eb.getBroadcastTables();
+      assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+    }
+  }
+
+  @Test
+  public final void testTwoBroadcastJoin() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join small1 on large1_id = small1_id " +
+        "join large2 on large1_id = large2_id " +
+        "join small2 on large2_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395736346625_0000_000009
+      |-eb_1395736346625_0000_000008 (GROUP-BY)
+         |-eb_1395736346625_0000_000007 (GROUP-BY, JOIN)
+           |-eb_1395736346625_0000_000006 (LEAF, JOIN)
+           |-eb_1395736346625_0000_000003 (LEAF, JOIN)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      if(index == 0) {
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+
+        assertTrue(!broadcastTables.contains("default.large1"));
+        assertTrue(broadcastTables.contains("default.small1"));
+      } else if(index == 1) {
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+        assertTrue(!broadcastTables.contains("default.large2"));
+        assertTrue(broadcastTables.contains("default.small2"));
+      }
+      index++;
+    }
+
+    assertEquals(5, index);
+  }
+
+  @Test
+  public final void testNotBroadcastJoinSubquery() throws IOException, PlanningException {
+    // This query is not broadcast join;
+    String query = "select count(*) from large1 " +
+        "join (select * from small1) a on large1_id = a.small1_id " +
+        "join small2 on a.small1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395749810370_0000_000007
+       |-eb_1395749810370_0000_000006 (GROUP-BY)
+          |-eb_1395749810370_0000_000005 (GROUP-BY, JOIN)
+             |-eb_1395749810370_0000_000004 (LEAF, SCAN, large1)
+             |-eb_1395749810370_0000_000003 (JOIN)
+                |-eb_1395749810370_0000_000002 (LEAF, SCAN, small2)
+                |-eb_1395749810370_0000_000001 (LEAF, TABLE_SUBQUERY, small1)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      Collection<String> broadcastTables = eb.getBroadcastTables();
+      assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      index++;
+    }
+
+    assertEquals(7, index);
+  }
+
+  @Test
+  public final void testBroadcastJoinSubquery() throws IOException, PlanningException {
+    String query = "select count(*) from large1 " +
+        "join (select * from small1) a on large1_id = a.small1_id " +
+        "join small2 on large1_id = small2_id";
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(query);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    /*
+    |-eb_1395794091662_0000_000007
+       |-eb_1395794091662_0000_000006
+          |-eb_1395794091662_0000_000005 (JOIN)
+             |-eb_1395794091662_0000_000004 (LEAF, SUBQUERY)
+             |-eb_1395794091662_0000_000003 (LEAF, JOIN)
+     */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      if(index == 0) {
+        //LEAF, JOIN
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
+
+        assertTrue(!broadcastTables.contains("default.large1"));
+        assertTrue(broadcastTables.contains("default.small2"));
+      } else if(index == 1) {
+        //LEAF, SUBQUERY
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      } else if(index == 2) {
+        //JOIN
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertTrue(broadcastTables == null || broadcastTables.isEmpty());
+      }
+      index++;
+    }
+
+    assertEquals(5, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
new file mode 100644
index 0000000..78714c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.worker.TajoWorker;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertNotNull;
+
+@Category(IntegrationTest.class)
+public class TestJoinBroadcast extends QueryTestCaseBase {
+  public TestJoinBroadcast() throws Exception {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+    testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "true");
+    testingCluster.setAllTajoDaemonConfValue(
+        TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024));
+
+    executeDDL("create_lineitem_large_ddl.sql", "lineitem_large");
+    executeDDL("create_customer_large_ddl.sql", "customer_large");
+  }
+
+  @Test
+  public final void testCrossJoin() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin3() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin4() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin5() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testWhereClauseJoin6() throws Exception {
+    ResultSet res = executeQuery();
+    System.out.println(resultSetToString(res));
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTPCHQ2Join() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoin1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithConstantExpr1() throws Exception {
+    // outer join with constant projections
+    //
+    // select c_custkey, orders.o_orderkey, 'val' as val from customer
+    // left outer join orders on c_custkey = o_orderkey;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithConstantExpr2() throws Exception {
+    // outer join with constant projections
+    //
+    // select c_custkey, o.o_orderkey, 'val' as val from customer left outer join
+    // (select * from orders) o on c_custkey = o.o_orderkey
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithConstantExpr3() throws Exception {
+    // outer join with constant projections
+    //
+    // select a.c_custkey, 123::INT8 as const_val, b.min_name from customer a
+    // left outer join ( select c_custkey, min(c_name) as min_name from customer group by c_custkey) b
+    // on a.c_custkey = b.c_custkey;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testRightOuterJoin1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testFullOuterJoin1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testJoinCoReferredEvals1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testJoinCoReferredEvalsWithSameExprs1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testJoinCoReferredEvalsWithSameExprs2() throws Exception {
+    // including grouping operator
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testCrossJoinAndCaseWhen() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testCrossJoinWithAsterisk1() throws Exception {
+    // select region.*, customer.* from region, customer;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testCrossJoinWithAsterisk2() throws Exception {
+    // select region.*, customer.* from customer, region;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testCrossJoinWithAsterisk3() throws Exception {
+    // select * from customer, region
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testCrossJoinWithAsterisk4() throws Exception {
+    // select length(r_regionkey), *, c_custkey*10 from customer, region
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testInnerJoinWithEmptyTable() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithEmptyTable1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithEmptyTable2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithEmptyTable3() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithEmptyTable4() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testRightOuterJoinWithEmptyTable1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testFullOuterJoinWithEmptyTable1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testCrossJoinWithEmptyTable1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testJoinOnMultipleDatabases() throws Exception {
+    executeString("CREATE DATABASE JOINS");
+    assertDatabaseExists("JOINS");
+    executeString("CREATE TABLE JOINS.part_ as SELECT * FROM part");
+    assertTableExists("JOINS.part_");
+    executeString("CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier");
+    assertTableExists("JOINS.supplier_");
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  private MasterPlan getQueryPlan(QueryId queryId) {
+    for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
+      QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
+      if (queryMasterTask != null) {
+        return queryMasterTask.getQuery().getPlan();
+      }
+    }
+
+    fail("Can't find query from workers" + queryId);
+    return null;
+  }
+
+  @Test
+  public final void testBroadcastBasicJoin() throws Exception {
+    ResultSet res = executeQuery();
+    TajoResultSet ts = (TajoResultSet)res;
+    assertResultSet(res);
+    cleanupQuery(res);
+
+    MasterPlan plan = getQueryPlan(ts.getQueryId());
+    ExecutionBlock rootEB = plan.getRoot();
+
+    /*
+    |-eb_1395998037360_0001_000006
+       |-eb_1395998037360_0001_000005
+     */
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock firstEB = plan.getChild(rootEB.getId(), 0);
+
+    assertNotNull(firstEB);
+    assertEquals(2, firstEB.getBroadcastTables().size());
+    assertTrue(firstEB.getBroadcastTables().contains("default.supplier"));
+    assertTrue(firstEB.getBroadcastTables().contains("default.part"));
+  }
+
+  @Test
+  public final void testBroadcastTwoPartJoin() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+
+    MasterPlan plan = getQueryPlan(((TajoResultSet)res).getQueryId());
+    ExecutionBlock rootEB = plan.getRoot();
+
+    /*
+    |-eb_1395996354406_0001_000010
+       |-eb_1395996354406_0001_000009
+          |-eb_1395996354406_0001_000008
+          |-eb_1395996354406_0001_000005
+     */
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock firstJoinEB = plan.getChild(rootEB.getId(), 0);
+    assertNotNull(firstJoinEB);
+    assertEquals(NodeType.JOIN, firstJoinEB.getPlan().getType());
+    assertEquals(0, firstJoinEB.getBroadcastTables().size());
+
+    ExecutionBlock leafEB1 = plan.getChild(firstJoinEB.getId(), 0);
+    assertTrue(leafEB1.getBroadcastTables().contains("default.orders"));
+    assertTrue(leafEB1.getBroadcastTables().contains("default.part"));
+
+    ExecutionBlock leafEB2 = plan.getChild(firstJoinEB.getId(), 1);
+    assertTrue(leafEB2.getBroadcastTables().contains("default.nation"));
+  }
+
+  @Test
+  public final void testBroadcastSubquery() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testBroadcastSubquery2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 19591ea..6d72ca7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.query;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -223,6 +224,18 @@ public class TestJoinQuery extends QueryTestCaseBase {
 
   @Test
   public final void testLeftOuterJoinWithEmptyTable1() throws Exception {
+    /*
+    select
+      c_custkey,
+      empty_orders.o_orderkey,
+      empty_orders.o_orderstatus,
+      empty_orders.o_orderdate
+    from
+      customer left outer join empty_orders on c_custkey = o_orderkey
+    order by
+      c_custkey, o_orderkey;
+     */
+
     ResultSet res = executeQuery();
     assertResultSet(res);
     cleanupQuery(res);

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
new file mode 100644
index 0000000..1cbbdf9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.util;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.engine.utils.TupleCacheKey;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+public class TestTupleCache {
+  @Test
+  public void testTupleCcaheBasicFunction() throws Exception {
+    List<Tuple> tupleData = new ArrayList<Tuple>();
+    for (int i = 0; i < 100; i++) {
+      Datum[] datums = new Datum[5];
+      for (int j = 0; j < 5; j++) {
+        datums[j] = new TextDatum(i + "_" + j);
+      }
+      Tuple tuple = new VTuple(datums);
+      tupleData.add(tuple);
+    }
+
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
+        QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
+
+    TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable");
+    TupleCache tupleCache = TupleCache.getInstance();
+
+    assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
+    assertTrue(tupleCache.lockBroadcastScan(cacheKey));
+    assertFalse(tupleCache.lockBroadcastScan(cacheKey));
+
+    tupleCache.addBroadcastCache(cacheKey, tupleData);
+    assertTrue(tupleCache.isBroadcastCacheReady(cacheKey));
+
+    Scanner scanner = tupleCache.openCacheScanner(cacheKey, null);
+    assertNotNull(scanner);
+
+    int count = 0;
+
+    while (true) {
+      Tuple tuple = scanner.next();
+      if (tuple == null) {
+        break;
+      }
+
+      assertEquals(tupleData.get(count), tuple);
+      count++;
+    }
+
+    assertEquals(tupleData.size(), count);
+
+    tupleCache.removeBroadcastCache(ebId);
+    assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
+    assertTrue(tupleCache.lockBroadcastScan(cacheKey));
+
+    tupleCache.removeBroadcastCache(ebId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 869d199..ab31c8d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -58,6 +58,8 @@ public class TestExecutionBlockCursor {
     util.startCatalogCluster();
 
     conf = util.getConfiguration();
+    conf.set(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "false");
+
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:!234/warehouse");
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -88,7 +90,9 @@ public class TestExecutionBlockCursor {
   @AfterClass
   public static void tearDown() {
     util.shutdownCatalogCluster();
-    dispatcher.stop();
+    if (dispatcher != null) {
+      dispatcher.stop();
+    }
   }
 
   @Test
@@ -114,6 +118,6 @@ public class TestExecutionBlockCursor {
       count++;
     }
 
-    assertEquals(6, count);
+    assertEquals(10, count);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
index fe8c070..07b4ac5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -86,11 +86,11 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase {
 
       res = executeQuery();
 
-      long[] expectedNumRows = new long[]{7, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{63, 34, 34, 18};
-      long[] expectedReadBytes = new long[]{63, 0, 34, 0};
+      long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{18, 34, 45, 75, 109, 34, 34, 18};
+      long[] expectedReadBytes = new long[]{18, 0, 45, 0, 109, 0, 34, 0};
 
-      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
+      assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
       cleanupQuery(res);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/customer_large/customer.tbl
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/customer_large/customer.tbl b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/customer_large/customer.tbl
new file mode 100644
index 0000000..cdccf00
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/customer_large/customer.tbl
@@ -0,0 +1,100 @@
+1|Customer#000000001|IVhzIApeRb ot,c,E|10|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e
+2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|7|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref
+3|Customer#000000003|MG9kdTD2WBHm|7|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov
+4|Customer#000000004|XxVSJsLAGtn|10|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou
+5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|5|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor
+6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|17|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
+7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|21|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp
+8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|4|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide
+9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|16|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl
+10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|20|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur
+11|Customer#000000011|PkWS 3HlXqwTuzrKg633BEi|19|33-464-151-3439|-272.60|BUILDING|ckages. requests sleep slyly. quickly even pinto beans promise above the slyly regular pinto beans. 
+12|Customer#000000012|9PWKuhzT4Zr1Q|16|23-791-276-1263|3396.49|HOUSEHOLD| to the carefully final braids. blithely regular requests nag. ironic theodolites boost quickly along
+13|Customer#000000013|nsXQu0oVjD7PM659uC3SRSp|9|13-761-547-5974|3857.34|BUILDING|ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely
+14|Customer#000000014|KXkletMlL2JQEA |22|11-845-129-3851|5266.30|FURNITURE|, ironic packages across the unus
+15|Customer#000000015|YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn|19|33-687-542-7601|2788.52|HOUSEHOLD| platelets. regular deposits detect asymptotes. blithely unusual packages nag slyly at the fluf
+16|Customer#000000016|cYiaeMLZSMAOQ2 d0W,|17|20-781-609-3107|4681.03|FURNITURE|kly silent courts. thinly regular theodolites sleep fluffily after 
+17|Customer#000000017|izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7|12|12-970-682-3487|6.34|AUTOMOBILE|packages wake! blithely even pint
+18|Customer#000000018|3txGO AiuFux3zT0Z9NYaFRnZt|3|16-155-215-1315|5494.43|BUILDING|s sleep. carefully even instructions nag furiously alongside of t
+19|Customer#000000019|uc,3bHIx84H,wdrmLOjVsiqXCq2tr|10|28-396-526-5053|8914.71|HOUSEHOLD| nag. furiously careful packages are slyly at the accounts. furiously regular in
+20|Customer#000000020|JrPk8Pqplj4Ne|23|32-957-234-8742|7603.40|FURNITURE|g alongside of the special excuses-- fluffily enticing packages wake 
+21|Customer#000000021|XYmVpr9yAHDEn|19|18-902-614-8344|1428.25|MACHINERY| quickly final accounts integrate blithely furiously u
+22|Customer#000000022|QI6p41,FNs5k7RZoCCVPUTkUdYpB|17|13-806-545-9701|591.98|MACHINERY|s nod furiously above the furiously ironic ideas.
+23|Customer#000000023|OdY W13N7Be3OC5MpgfmcYss0Wn6TKT|15|13-312-472-8245|3332.02|HOUSEHOLD|deposits. special deposits cajole slyly. fluffily special deposits about the furiously 
+24|Customer#000000024|HXAFgIAyjxtdqwimt13Y3OZO 4xeLe7U8PqG|19|23-127-851-8031|9255.67|MACHINERY|into beans. fluffily final ideas haggle fluffily
+25|Customer#000000025|Hp8GyFQgGHFYSilH5tBfe|7|22-603-468-3533|7133.70|FURNITURE|y. accounts sleep ruthlessly according to the regular theodolites. unusual instructions sleep. ironic, final
+26|Customer#000000026|8ljrc5ZeMl7UciP|4|32-363-455-4837|5182.05|AUTOMOBILE|c requests use furiously ironic requests. slyly ironic dependencies us
+27|Customer#000000027|IS8GIyxpBrLpMT0u7|7|13-137-193-2709|5679.84|BUILDING| about the carefully ironic pinto beans. accoun
+28|Customer#000000028|iVyg0daQ,Tha8x2WPWA9m2529m|12|18-774-241-1462|1007.18|FURNITURE| along the regular deposits. furiously final pac
+29|Customer#000000029|sJ5adtfyAkCK63df2,vF25zyQMVYE34uh|13|10-773-203-7342|7618.27|FURNITURE|its after the carefully final platelets x-ray against 
+30|Customer#000000030|nJDsELGAavU63Jl0c5NKsKfL8rIJQQkQnYL2QJY|13|11-764-165-5076|9321.01|BUILDING|lithely final requests. furiously unusual account
+31|Customer#000000031|LUACbO0viaAv6eXOAebryDB xjVst|21|33-197-837-7094|5236.89|HOUSEHOLD|s use among the blithely pending depo
+32|Customer#000000032|jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J|15|25-430-914-2194|3471.53|BUILDING|cial ideas. final, furious requests across the e
+33|Customer#000000033|qFSlMuLucBmx9xnn5ib2csWUweg D|4|27-375-391-1280|-78.56|AUTOMOBILE|s. slyly regular accounts are furiously. carefully pending requests
+34|Customer#000000034|Q6G9wZ6dnczmtOx509xgE,M2KV|11|25-344-968-5422|8589.70|HOUSEHOLD|nder against the even, pending accounts. even
+35|Customer#000000035|TEjWGE4nBzJL2|22|27-566-888-7431|1228.24|HOUSEHOLD|requests. special, express requests nag slyly furiousl
+36|Customer#000000036|3TvCzjuPzpJ0,DdJ8kW5U|15|31-704-669-5769|4987.27|BUILDING|haggle. enticing, quiet platelets grow quickly bold sheaves. carefully regular acc
+37|Customer#000000037|7EV4Pwh,3SboctTWt|11|18-385-235-7162|-917.75|FURNITURE|ilent packages are carefully among the deposits. furiousl
+38|Customer#000000038|a5Ee5e9568R8RLP 2ap7|14|22-306-880-7212|6345.11|HOUSEHOLD|lar excuses. closely even asymptotes cajole blithely excuses. carefully silent pinto beans sleep carefully fin
+39|Customer#000000039|nnbRg,Pvy33dfkorYE FdeZ60|3|12-387-467-6509|6264.31|AUTOMOBILE|tions. slyly silent excuses slee
+40|Customer#000000040|gOnGWAyhSV1ofv|1|13-652-915-8939|1335.30|BUILDING|rges impress after the slyly ironic courts. foxes are. blithely 
+41|Customer#000000041|IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF|0|20-917-711-4011|270.95|HOUSEHOLD|ly regular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo
+42|Customer#000000042|ziSrvyyBke|12|15-416-330-4175|8727.01|BUILDING|ssly according to the pinto beans: carefully special requests across the even, pending accounts wake special
+43|Customer#000000043|ouSbjHk8lh5fKX3zGso3ZSIj9Aa3PoaFd|1|29-316-665-2897|9904.28|MACHINERY|ial requests: carefully pending foxes detect quickly. carefully final courts cajole quickly. carefully
+44|Customer#000000044|Oi,dOSPwDu4jo4x,,P85E0dmhZGvNtBwi|2|26-190-260-5375|7315.94|AUTOMOBILE|r requests around the unusual, bold a
+45|Customer#000000045|4v3OcpFgoOmMG,CbnF,4mdC|2|19-715-298-9917|9983.38|AUTOMOBILE|nto beans haggle slyly alongside of t
+46|Customer#000000046|eaTXWWm10L9|3|16-357-681-2007|5744.59|AUTOMOBILE|ctions. accounts sleep furiously even requests. regular, regular accounts cajole blithely around the final pa
+47|Customer#000000047|b0UgocSqEW5 gdVbhNT|18|12-427-271-9466|274.58|BUILDING|ions. express, ironic instructions sleep furiously ironic ideas. furi
+48|Customer#000000048|0UU iPhBupFvemNB|15|10-508-348-5882|3792.50|BUILDING|re fluffily pending foxes. pending, bold platelets sleep slyly. even platelets cajo
+49|Customer#000000049|cNgAeX7Fqrdf7HQN9EwjUa4nxT,68L FKAxzl|0|20-908-631-4424|4573.94|FURNITURE|nusual foxes! fluffily pending packages maintain to the regular 
+50|Customer#000000050|9SzDYlkzxByyJ1QeTI o|18|16-658-112-3221|4266.13|MACHINERY|ts. furiously ironic accounts cajole furiously slyly ironic dinos.
+51|Customer#000000051|uR,wEaiTvo4|14|22-344-885-4251|855.87|FURNITURE|eposits. furiously regular requests integrate carefully packages. furious
+52|Customer#000000052|7 QOqGqqSy9jfV51BC71jcHJSD0|9|21-186-284-5998|5630.28|HOUSEHOLD|ic platelets use evenly even accounts. stealthy theodolites cajole furiou
+53|Customer#000000053|HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib|10|25-168-852-5363|4113.64|HOUSEHOLD|ar accounts are. even foxes are blithely. fluffily pending deposits boost
+54|Customer#000000054|,k4vf 5vECGWFy,hosTE,|4|14-776-370-4745|868.90|AUTOMOBILE|sual, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi
+55|Customer#000000055|zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom|12|20-180-440-8525|4572.11|MACHINERY|ully unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab
+56|Customer#000000056|BJYZYJQk4yD5B|19|20-895-685-6920|6530.86|FURNITURE|. notornis wake carefully. carefully fluffy requests are furiously even accounts. slyly expre
+57|Customer#000000057|97XYbsuOPRXPWU|13|31-835-306-1650|4151.93|AUTOMOBILE|ove the carefully special packages. even, unusual deposits sleep slyly pend
+58|Customer#000000058|g9ap7Dk1Sv9fcXEWjpMYpBZIRUohi T|9|23-244-493-2508|6478.46|HOUSEHOLD|ideas. ironic ideas affix furiously express, final instructions. regular excuses use quickly e
+59|Customer#000000059|zLOCP0wh92OtBihgspOGl4|9|11-355-584-3112|3458.60|MACHINERY|ously final packages haggle blithely after the express deposits. furiou
+60|Customer#000000060|FyodhjwMChsZmUz7Jz0H|1|22-480-575-5866|2741.87|MACHINERY|latelets. blithely unusual courts boost furiously about the packages. blithely final instruct
+61|Customer#000000061|9kndve4EAJxhg3veF BfXr7AqOsT39o gtqjaYE|13|27-626-559-8599|1536.24|FURNITURE|egular packages shall have to impress along the 
+62|Customer#000000062|upJK2Dnw13,|18|17-361-978-7059|595.61|MACHINERY|kly special dolphins. pinto beans are slyly. quickly regular accounts are furiously a
+63|Customer#000000063|IXRSpVWWZraKII|20|31-952-552-9584|9331.13|AUTOMOBILE|ithely even accounts detect slyly above the fluffily ir
+64|Customer#000000064|MbCeGY20kaKK3oalJD,OT|15|13-558-731-7204|-646.64|BUILDING|structions after the quietly ironic theodolites cajole be
+65|Customer#000000065|RGT yzQ0y4l0H90P783LG4U95bXQFDRXbWa1sl,X|10|33-733-623-5267|8795.16|AUTOMOBILE|y final foxes serve carefully. theodolites are carefully. pending i
+66|Customer#000000066|XbsEqXH1ETbJYYtA1A|1|32-213-373-5094|242.77|HOUSEHOLD|le slyly accounts. carefully silent packages benea
+67|Customer#000000067|rfG0cOgtr5W8 xILkwp9fpCS8|0|19-403-114-4356|8166.59|MACHINERY|indle furiously final, even theodo
+68|Customer#000000068|o8AibcCRkXvQFh8hF,7o|3|22-918-832-2411|6853.37|HOUSEHOLD| pending pinto beans impress realms. final dependencies 
+69|Customer#000000069|Ltx17nO9Wwhtdbe9QZVxNgP98V7xW97uvSH1prEw|13|19-225-978-5670|1709.28|HOUSEHOLD|thely final ideas around the quickly final dependencies affix carefully quickly final theodolites. final accounts c
+70|Customer#000000070|mFowIuhnHjp2GjCiYYavkW kUwOjIaTCQ|3|32-828-107-2832|4867.52|FURNITURE|fter the special asymptotes. ideas after the unusual frets cajole quickly regular pinto be
+71|Customer#000000071|TlGalgdXWBmMV,6agLyWYDyIz9MKzcY8gl,w6t1B|16|17-710-812-5403|-611.19|HOUSEHOLD|g courts across the regular, final pinto beans are blithely pending ac
+72|Customer#000000072|putjlmskxE,zs,HqeIA9Wqu7dhgH5BVCwDwHHcf|8|12-759-144-9689|-362.86|FURNITURE|ithely final foxes sleep always quickly bold accounts. final wat
+73|Customer#000000073|8IhIxreu4Ug6tt5mog4|4|10-473-439-3214|4288.50|BUILDING|usual, unusual packages sleep busily along the furiou
+74|Customer#000000074|IkJHCA3ZThF7qL7VKcrU nRLl,kylf |11|14-199-862-7209|2764.43|MACHINERY|onic accounts. blithely slow packages would haggle carefully. qui
+75|Customer#000000075|Dh 6jZ,cwxWLKQfRKkiGrzv6pm|2|28-247-803-9025|6684.10|AUTOMOBILE| instructions cajole even, even deposits. finally bold deposits use above the even pains. slyl
+76|Customer#000000076|m3sbCvjMOHyaOofH,e UkGPtqc4|8|10-349-718-3044|5745.33|FURNITURE|pecial deposits. ironic ideas boost blithely according to the closely ironic theodolites! furiously final deposits n
+77|Customer#000000077|4tAE5KdMFGD4byHtXF92vx|1|27-269-357-4674|1738.87|BUILDING|uffily silent requests. carefully ironic asymptotes among the ironic hockey players are carefully bli
+78|Customer#000000078|HBOta,ZNqpg3U2cSL0kbrftkPwzX|5|19-960-700-9191|7136.97|FURNITURE|ests. blithely bold pinto beans h
+79|Customer#000000079|n5hH2ftkVRwW8idtD,BmM2|2|25-147-850-4166|5121.28|MACHINERY|es. packages haggle furiously. regular, special requests poach after the quickly express ideas. blithely pending re
+80|Customer#000000080|K,vtXp8qYB |4|10-267-172-7101|7383.53|FURNITURE|tect among the dependencies. bold accounts engage closely even pinto beans. ca
+81|Customer#000000081|SH6lPA7JiiNC6dNTrR|17|30-165-277-3269|2023.71|BUILDING|r packages. fluffily ironic requests cajole fluffily. ironically regular theodolit
+82|Customer#000000082|zhG3EZbap4c992Gj3bK,3Ne,Xn|3|28-159-442-5305|9468.34|AUTOMOBILE|s wake. bravely regular accounts are furiously. regula
+83|Customer#000000083|HnhTNB5xpnSF20JBH4Ycs6psVnkC3RDf|7|32-817-154-4122|6463.51|BUILDING|ccording to the quickly bold warhorses. final, regular foxes integrate carefully. bold packages nag blithely ev
+84|Customer#000000084|lpXz6Fwr9945rnbtMc8PlueilS1WmASr CB|19|21-546-818-3802|5174.71|FURNITURE|ly blithe foxes. special asymptotes haggle blithely against the furiously regular depo
+85|Customer#000000085|siRerlDwiolhYR 8FgksoezycLj|0|15-745-585-8219|3386.64|FURNITURE|ronic ideas use above the slowly pendin
+86|Customer#000000086|US6EGGHXbTTXPL9SBsxQJsuvy|15|10-677-951-2353|3306.32|HOUSEHOLD|quests. pending dugouts are carefully aroun
+87|Customer#000000087|hgGhHVSWQl 6jZ6Ev|2|33-869-884-7053|6327.54|FURNITURE|hely ironic requests integrate according to the ironic accounts. slyly regular pla
+88|Customer#000000088|wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg|14|26-516-273-2566|8031.44|AUTOMOBILE|s are quickly above the quickly ironic instructions; even requests about the carefully final deposi
+89|Customer#000000089|dtR, y9JQWUO6FoJExyp8whOU|17|24-394-451-5404|1530.76|FURNITURE|counts are slyly beyond the slyly final accounts. quickly final ideas wake. r
+90|Customer#000000090|QxCzH7VxxYUWwfL7|14|26-603-491-1238|7354.23|BUILDING|sly across the furiously even 
+91|Customer#000000091|S8OMYFrpHwoNHaGBeuS6E 6zhHGZiprw1b7 q|2|18-239-400-3677|4643.14|AUTOMOBILE|onic accounts. fluffily silent pinto beans boost blithely according to the fluffily exp
+92|Customer#000000092|obP PULk2LH LqNF,K9hcbNqnLAkJVsl5xqSrY,|12|12-446-416-8471|1182.91|MACHINERY|. pinto beans hang slyly final deposits. ac
+93|Customer#000000093|EHXBr2QGdh|19|17-359-388-5266|2182.52|MACHINERY|press deposits. carefully regular platelets r
+94|Customer#000000094|IfVNIN9KtkScJ9dUjK3Pg5gY1aFeaXewwf|5|19-953-499-8833|5500.11|HOUSEHOLD|latelets across the bold, final requests sleep according to the fluffily bold accounts. unusual deposits amon
+95|Customer#000000095|EU0xvmWvOmUUn5J,2z85DQyG7QCJ9Xq7|5|25-923-255-2929|5327.38|MACHINERY|ithely. ruthlessly final requests wake slyly alongside of the furiously silent pinto beans. even the
+96|Customer#000000096|vWLOrmXhRR|7|18-422-845-1202|6323.92|AUTOMOBILE|press requests believe furiously. carefully final instructions snooze carefully. 
+97|Customer#000000097|OApyejbhJG,0Iw3j rd1M|16|27-588-919-5638|2164.48|AUTOMOBILE|haggle slyly. bold, special ideas are blithely above the thinly bold theo
+98|Customer#000000098|7yiheXNSpuEAwbswDW|11|22-885-845-6889|-551.37|BUILDING|ages. furiously pending accounts are quickly carefully final foxes: busily pe
+99|Customer#000000099|szsrOiPtCHVS97Lt|9|25-515-237-9232|4088.65|HOUSEHOLD|cajole slyly about the regular theodolites! furiously bold requests nag along the pending, regular packages. somas
+100|Customer#000000100|fptUABXcmkC5Wx|17|30-749-445-4907|9889.89|FURNITURE|was furiously fluffily quiet deposits. silent, pending requests boost against 

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/lineitem_large/lineitem.tbl
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/lineitem_large/lineitem.tbl b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/lineitem_large/lineitem.tbl
new file mode 100644
index 0000000..e637c73
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestJoinBroadcast/lineitem_large/lineitem.tbl
@@ -0,0 +1,97 @@
+1|3|4|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the
+1|1|4|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold 
+1|3|4|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep
+1|2|3|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de
+1|1|4|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re
+1|1|2|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex
+2|2|2|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a
+3|1|3|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco
+3|2|2|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve
+3|3|2|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. 
+3|1|3|4|2|2618.76|0.01|0.06|A|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d
+3|3|2|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending
+3|3|4|6|26|28733.64|0.10|0.02|A|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref
+4|1|4|1|30|30690.90|0.03|0.08|N|O|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly
+5|2|2|1|15|23678.55|0.02|0.04|R|F|1994-10-31|1994-08-31|1994-11-20|NONE|AIR|ts wake furiously 
+5|3|2|2|26|50723.92|0.07|0.08|R|F|1994-10-16|1994-09-25|1994-10-19|NONE|FOB|sts use slyly quickly special instruc
+5|1|3|3|50|73426.50|0.08|0.03|A|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual
+6|1|4|1|37|61998.31|0.08|0.03|A|F|1992-04-27|1992-05-15|1992-05-02|TAKE BACK RETURN|TRUCK|p furiously special foxes
+7|1|3|1|12|13608.60|0.07|0.03|N|O|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th
+7|2|2|2|9|11594.16|0.08|0.08|N|O|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions
+7|3|4|3|46|81639.88|0.10|0.07|N|O|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques
+7|1|2|4|28|31809.96|0.03|0.04|N|O|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl
+7|2|3|5|38|73943.82|0.08|0.01|N|O|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl
+7|1|4|6|35|43058.75|0.06|0.03|N|O|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of 
+7|2|2|7|5|6476.15|0.04|0.02|N|O|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula
+32|3|3|1|28|47227.60|0.05|0.08|N|O|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req
+32|3|4|2|32|64605.44|0.02|0.00|N|O|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily 
+32|3|2|3|2|2210.32|0.09|0.02|N|O|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the
+32|1|3|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac
+32|3|3|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo
+32|3|2|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.
+33|1|3|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package
+33|3|2|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites
+33|3|3|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc
+33|2|4|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref
+34|3|4|1|13|17554.68|0.00|0.07|N|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon
+34|1|4|2|22|30875.02|0.08|0.06|N|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p
+34|2|4|3|6|9681.24|0.02|0.06|N|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep 
+35|3|2|1|24|32410.80|0.02|0.00|N|O|1996-02-21|1996-01-03|1996-03-18|TAKE BACK RETURN|FOB|, regular tithe
+35|3|3|2|34|68065.96|0.06|0.08|N|O|1996-01-22|1996-01-06|1996-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f
+35|3|2|3|7|13418.23|0.06|0.04|N|O|1996-01-19|1995-12-22|1996-01-29|NONE|MAIL| the carefully regular 
+35|3|4|4|25|29004.25|0.06|0.05|N|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti
+35|1|2|5|34|65854.94|0.08|0.06|N|O|1995-11-08|1996-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost
+35|3|2|6|28|47397.28|0.03|0.02|N|O|1996-02-01|1995-12-24|1996-02-28|COLLECT COD|RAIL|ly alongside of 
+36|3|3|1|42|75043.92|0.09|0.00|N|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special 
+37|1|4|1|40|62105.20|0.09|0.03|A|F|1992-07-21|1992-08-01|1992-08-15|NONE|REG AIR|luffily regular requests. slyly final acco
+37|1|3|2|39|70542.42|0.05|0.02|A|F|1992-07-02|1992-08-18|1992-07-28|TAKE BACK RETURN|RAIL|the final requests. ca
+37|1|2|3|43|78083.70|0.05|0.08|A|F|1992-07-10|1992-07-06|1992-08-02|DELIVER IN PERSON|TRUCK|iously ste
+38|2|3|1|44|84252.52|0.04|0.02|N|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am
+39|2|2|1|44|53782.08|0.09|0.06|N|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful
+39|2|3|2|26|43383.08|0.08|0.04|N|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent
+39|2|4|3|46|82746.18|0.06|0.08|N|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e
+39|3|4|4|32|48338.88|0.07|0.05|N|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac
+39|2|3|5|43|63360.93|0.01|0.01|N|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i
+39|2|4|6|40|54494.40|0.06|0.05|N|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox
+64|1|4|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.
+65|3|3|1|26|42995.94|0.03|0.03|A|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca
+65|2|2|2|22|39353.82|0.00|0.05|N|O|1995-07-17|1995-06-04|1995-07-19|COLLECT COD|FOB| ideas. special, r
+65|3|2|3|21|27076.98|0.09|0.07|N|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu
+66|3|2|1|31|35126.41|0.00|0.08|R|F|1994-02-19|1994-03-11|1994-02-20|TAKE BACK RETURN|RAIL|ut the unusual accounts sleep at the bo
+66|3|3|2|41|64061.68|0.04|0.07|A|F|1994-02-21|1994-03-01|1994-03-18|COLLECT COD|AIR| regular de
+67|1|3|1|4|6230.52|0.09|0.04|N|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres
+67|1|3|2|12|13358.28|0.09|0.05|N|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole
+67|1|4|3|5|8368.00|0.03|0.07|N|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto 
+67|3|2|4|44|66066.44|0.08|0.06|N|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques
+67|1|3|5|23|35733.03|0.05|0.07|N|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit
+67|2|4|6|29|40144.70|0.02|0.05|N|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers 
+68|2|4|1|3|2925.18|0.05|0.02|N|O|1998-07-04|1998-06-05|1998-07-21|NONE|RAIL|fully special instructions cajole. furious
+68|1|3|2|46|57738.28|0.02|0.05|N|O|1998-06-26|1998-06-07|1998-07-05|NONE|MAIL| requests are unusual, regular pinto 
+68|2|3|3|46|88089.08|0.04|0.05|N|O|1998-08-13|1998-07-08|1998-08-29|NONE|RAIL|egular dependencies affix ironically along 
+68|2|3|4|20|34454.40|0.07|0.01|N|O|1998-06-27|1998-05-23|1998-07-02|NONE|REG AIR| excuses integrate fluffily 
+68|3|4|5|27|47000.25|0.03|0.06|N|O|1998-06-19|1998-06-25|1998-06-29|DELIVER IN PERSON|SHIP|ccounts. deposits use. furiously
+68|1|3|6|30|46906.80|0.05|0.06|N|O|1998-08-11|1998-07-11|1998-08-14|NONE|RAIL|oxes are slyly blithely fin
+68|1|4|7|41|52735.84|0.09|0.08|N|O|1998-06-24|1998-06-27|1998-07-06|NONE|SHIP|eposits nag special ideas. furiousl
+69|3|2|1|48|58761.60|0.01|0.07|A|F|1994-08-17|1994-08-11|1994-09-08|NONE|TRUCK|regular epitaphs. carefully even ideas hag
+69|2|2|2|32|37893.76|0.08|0.06|A|F|1994-08-24|1994-08-17|1994-08-31|NONE|REG AIR|s sleep carefully bold, 
+69|2|2|3|17|22172.42|0.09|0.00|A|F|1994-07-02|1994-07-07|1994-07-03|TAKE BACK RETURN|AIR|final, pending instr
+69|3|4|4|3|4318.50|0.09|0.04|R|F|1994-06-06|1994-07-27|1994-06-15|NONE|MAIL| blithely final d
+69|3|4|5|42|44606.94|0.07|0.04|R|F|1994-07-31|1994-07-26|1994-08-28|DELIVER IN PERSON|REG AIR|tect regular, speci
+69|2|2|6|23|32717.50|0.05|0.00|A|F|1994-10-03|1994-08-06|1994-10-24|NONE|SHIP|nding accounts ca
+70|3|4|1|8|8736.96|0.03|0.08|R|F|1994-01-12|1994-02-27|1994-01-14|TAKE BACK RETURN|FOB|ggle. carefully pending dependenc
+70|3|2|2|13|16277.95|0.06|0.06|A|F|1994-03-03|1994-02-13|1994-03-26|COLLECT COD|AIR|lyly special packag
+70|2|2|3|1|1888.80|0.03|0.05|R|F|1994-01-26|1994-03-05|1994-01-28|TAKE BACK RETURN|RAIL|quickly. fluffily unusual theodolites c
+70|2|2|4|11|18477.03|0.01|0.05|A|F|1994-03-17|1994-03-17|1994-03-27|NONE|MAIL|alongside of the deposits. fur
+70|2|2|5|37|39520.81|0.09|0.04|R|F|1994-02-13|1994-03-16|1994-02-21|COLLECT COD|MAIL|n accounts are. q
+70|3|2|6|19|30602.35|0.06|0.03|A|F|1994-01-26|1994-02-17|1994-02-06|TAKE BACK RETURN|SHIP| packages wake pending accounts.
+71|3|2|1|25|47323.25|0.09|0.07|N|O|1998-04-10|1998-04-22|1998-04-11|COLLECT COD|FOB|ckly. slyly
+71|3|2|2|3|5645.73|0.09|0.07|N|O|1998-05-23|1998-04-03|1998-06-02|COLLECT COD|SHIP|y. pinto beans haggle after the
+71|3|3|3|45|61489.35|0.00|0.07|N|O|1998-02-23|1998-03-20|1998-03-24|DELIVER IN PERSON|SHIP| ironic packages believe blithely a
+71|3|2|4|33|54174.12|0.00|0.01|N|O|1998-04-12|1998-03-20|1998-04-15|NONE|FOB| serve quickly fluffily bold deposi
+71|3|2|5|39|49071.75|0.08|0.06|N|O|1998-01-29|1998-04-07|1998-02-18|DELIVER IN PERSON|RAIL|l accounts sleep across the pack
+71|1|3|6|34|58841.42|0.04|0.01|N|O|1998-03-05|1998-04-22|1998-03-30|DELIVER IN PERSON|TRUCK|s cajole. 
+96|1|2|1|23|25278.61|0.10|0.06|A|F|1994-07-19|1994-06-29|1994-07-25|DELIVER IN PERSON|TRUCK|ep-- carefully reg
+96|3|4|2|30|42761.70|0.01|0.06|R|F|1994-06-03|1994-05-29|1994-06-22|DELIVER IN PERSON|TRUCK|e quickly even ideas. furiou
+97|1|3|1|13|19454.11|0.00|0.02|R|F|1993-04-01|1993-04-04|1993-04-08|NONE|TRUCK|ayers cajole against the furiously
+97|1|3|2|37|56149.72|0.02|0.06|A|F|1993-04-13|1993-03-30|1993-04-14|DELIVER IN PERSON|SHIP|ic requests boost carefully quic
+97|1|2|3|19|31857.11|0.06|0.08|R|F|1993-05-14|1993-03-05|1993-05-25|TAKE BACK RETURN|RAIL|gifts. furiously ironic packages cajole. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_customer_large_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_customer_large_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_customer_large_ddl.sql
new file mode 100644
index 0000000..e5be236
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_customer_large_ddl.sql
@@ -0,0 +1,7 @@
+-- Large customer Table
+-- It is used for broadcast join
+
+create external table if not exists customer_large (
+    c_custkey INT4, c_name TEXT, c_address TEXT, c_nationkey INT4,
+    c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT)
+using csv with ('csvfile.delimiter'='|', 'csvfile.null'='NULL') location ${table.path};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_lineitem_large_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_lineitem_large_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_lineitem_large_ddl.sql
new file mode 100644
index 0000000..366f22b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/create_lineitem_large_ddl.sql
@@ -0,0 +1,7 @@
+-- Large lineitem Table
+-- It is used for broadcast join
+
+create external table if not exists lineitem_large ( l_orderkey INT4, l_partkey INT4, l_suppkey INT4, l_linenumber INT4, l_quantity FLOAT8,
+    l_extendedprice FLOAT8, l_discount FLOAT8, l_tax FLOAT8, l_returnflag TEXT, l_linestatus TEXT, l_shipdate TEXT, l_commitdate TEXT,
+    l_receiptdate TEXT, l_shipinstruct TEXT, l_shipmode TEXT, l_comment TEXT)
+using csv with ('csvfile.delimiter'='|', 'csvfile.null'='NULL') location ${table.path};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table1_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table1_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table1_ddl.sql
new file mode 100644
index 0000000..c373a64
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table1_ddl.sql
@@ -0,0 +1,6 @@
+-- Outer Join's Left Table
+-- It is used in TestJoin::testOuterJoinAndCaseWhen
+
+create external table if not exists table1 (id int, name text, score float, type text) using csv
+with ('csvfile.delimiter'='|', 'csvfile.null'='NULL') location ${table.path};
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table2_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table2_ddl.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table2_ddl.sql
new file mode 100644
index 0000000..ec2d82f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/oj_table2_ddl.sql
@@ -0,0 +1,6 @@
+-- Outer Join's Left Table
+-- It is used in TestJoin::testOuterJoinAndCaseWhen
+
+create external table if not exists table2 (id int, name text, score float, type text) using csv
+with ('csvfile.delimiter'='|', 'csvfile.null'='NULL') location ${table.path};
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastBasicJoin.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastBasicJoin.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastBasicJoin.sql
new file mode 100644
index 0000000..24ac942
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastBasicJoin.sql
@@ -0,0 +1,11 @@
+select
+    l_orderkey,
+    p_name,
+    s_name
+from
+    lineitem_large,
+    part,
+    supplier
+where
+    lineitem_large.l_partkey = part.p_partkey
+    and lineitem_large.l_suppkey = supplier.s_suppkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery.sql
new file mode 100644
index 0000000..dcd9569
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery.sql
@@ -0,0 +1,11 @@
+select
+    l_orderkey,
+    a.o_custkey,
+    p_name
+from
+    lineitem_large,
+    part,
+    (select o_orderkey, o_custkey from orders) a
+where
+    l_partkey = p_partkey
+    and l_orderkey = a.o_orderkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery2.sql
new file mode 100644
index 0000000..81729c9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastSubquery2.sql
@@ -0,0 +1,19 @@
+select sum(b.l_quantity)
+from (
+      select a.l_orderkey, a.l_quantity
+        from lineitem_large a
+        join part on a.l_partkey = p_partkey) b
+join orders c on c.o_orderkey = b.l_orderkey
+join (
+      select e.l_orderkey, avg(e.l_quantity) avg_quantity
+      from (
+          select d.l_orderkey, d.l_quantity
+            from lineitem_large d
+            join part on d.l_partkey = p_partkey
+      ) e
+      group by e.l_orderkey
+) f
+on c.o_orderkey = f.l_orderkey
+where
+  c.o_orderkey > 0 and
+  b.l_quantity > f.avg_quantity
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/464f3e52/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastTwoPartJoin.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastTwoPartJoin.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastTwoPartJoin.sql
new file mode 100644
index 0000000..bdad24f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestJoinBroadcast/testBroadcastTwoPartJoin.sql
@@ -0,0 +1,15 @@
+select
+    l_orderkey,
+    p_name,
+    n_name
+from
+    lineitem_large,
+    orders,
+    part,
+    customer_large,
+    nation
+where
+    l_orderkey = o_orderkey
+    and l_partkey = p_partkey
+    and o_custkey = c_custkey
+    and c_nationkey = n_nationkey
\ No newline at end of file