You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 18:25:42 UTC
[03/13] tajo git commit: TAJO-1383: Improve broadcast table cache.
(jinho)
TAJO-1383: Improve broadcast table cache. (jinho)
closes #404
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e1e38e23
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e1e38e23
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e1e38e23
Branch: refs/heads/index_support
Commit: e1e38e231867e4f6f953a7ec41f5f9d5ad242580
Parents: 7f05695
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Mar 13 16:55:25 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Mar 13 16:55:25 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../engine/planner/physical/HashJoinExec.java | 51 ++++++--
.../planner/physical/HashLeftOuterJoinExec.java | 74 ++++++++++-
.../physical/PartitionMergeScanExec.java | 17 ++-
.../tajo/engine/planner/physical/ScanExec.java | 72 +++++++++++
.../engine/planner/physical/SeqScanExec.java | 105 +++-------------
.../apache/tajo/engine/utils/CacheHolder.java | 97 +++++++++++++++
.../apache/tajo/engine/utils/TableCache.java | 84 +++++++++++++
.../apache/tajo/engine/utils/TableCacheKey.java | 57 +++++++++
.../apache/tajo/engine/utils/TupleCache.java | 122 -------------------
.../apache/tajo/engine/utils/TupleCacheKey.java | 57 ---------
.../worker/ExecutionBlockSharedResource.java | 26 ++++
.../apache/tajo/worker/TaskAttemptContext.java | 2 +-
.../apache/tajo/worker/TaskRunnerManager.java | 4 +-
.../apache/tajo/engine/util/TestTableCache.java | 109 +++++++++++++++++
.../apache/tajo/engine/util/TestTupleCache.java | 89 --------------
.../plan/serder/LogicalNodeDeserializer.java | 3 +
.../tajo/plan/serder/LogicalNodeSerializer.java | 2 +
tajo-plan/src/main/proto/Plan.proto | 1 +
19 files changed, 599 insertions(+), 375 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e6f7917..84a7571 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,8 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1383: Improve broadcast table cache. (jinho)
+
TAJO-1374: Support multi-bytes delimiter for CSV file.
(Contributed by navis, Committed by jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index d475b78..3bdf2d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -19,15 +19,18 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -58,13 +61,14 @@ public class HashJoinExec extends BinaryPhysicalExec {
// projection
protected final Projector projector;
+ private TableStats cachedRightTableStats;
+
public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
PhysicalExec rightExec) {
super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
leftExec, rightExec);
this.plan = plan;
this.joinQual = plan.getJoinQual();
- this.tupleSlots = new HashMap<Tuple, List<Tuple>>(100000);
// HashJoin only can manage equi join key pairs.
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
@@ -151,8 +155,41 @@ public class HashJoinExec extends BinaryPhysicalExec {
}
protected void loadRightToHashTable() throws IOException {
+ ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+ if (scanExec.canBroadcast()) {
+ /* If this table can broadcast, all tasks in a node will share the same cache */
+ TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+ context, scanExec.getCanonicalName(), scanExec.getFragments());
+ loadRightFromCache(key);
+ } else {
+ this.tupleSlots = buildRightToHashTable();
+ }
+
+ first = false;
+ }
+
+ protected void loadRightFromCache(TableCacheKey key) throws IOException {
+ ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+ synchronized (sharedResource.getLock()) {
+ if (sharedResource.hasBroadcastCache(key)) {
+ CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+ this.tupleSlots = data.getData();
+ this.cachedRightTableStats = data.getTableStats();
+ } else {
+ CacheHolder.BroadcastCacheHolder holder =
+ new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
+ sharedResource.addBroadcastCache(key, holder);
+ CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+ this.tupleSlots = data.getData();
+ this.cachedRightTableStats = data.getTableStats();
+ }
+ }
+ }
+
+ private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
Tuple tuple;
Tuple keyTuple;
+ Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
while (!context.isStopped() && (tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
@@ -160,18 +197,18 @@ public class HashJoinExec extends BinaryPhysicalExec {
keyTuple.put(i, tuple.get(rightKeyList[i]));
}
- List<Tuple> newValue = tupleSlots.get(keyTuple);
+ List<Tuple> newValue = map.get(keyTuple);
if (newValue != null) {
newValue.add(tuple);
} else {
newValue = new ArrayList<Tuple>();
newValue.add(tuple);
- tupleSlots.put(keyTuple, newValue);
+ map.put(keyTuple, newValue);
}
}
- first = false;
+ return map;
}
@Override
@@ -219,7 +256,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
inputStats.setNumRows(leftInputStats.getNumRows());
}
- TableStats rightInputStats = rightChild.getInputStats();
+ TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
if (rightInputStats != null) {
inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 233ef92..e78cb20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -22,7 +22,10 @@ import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCacheKey;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.catalog.SchemaUtil;
@@ -33,6 +36,7 @@ import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -66,6 +70,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
protected Projector projector;
private int rightNumCols;
+ private TableStats cachedRightTableStats;
private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
@@ -91,8 +96,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
this.joinFilter = null;
}
- this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
-
// HashJoin only can manage equi join key pairs.
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
rightChild.getSchema(), false);
@@ -201,8 +204,41 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
}
protected void loadRightToHashTable() throws IOException {
+ ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
+ if (scanExec.canBroadcast()) {
+ /* If this table can broadcast, all tasks in a node will share the same cache */
+ TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
+ context, scanExec.getCanonicalName(), scanExec.getFragments());
+ loadRightFromCache(key);
+ } else {
+ this.tupleSlots = buildRightToHashTable();
+ }
+
+ first = false;
+ }
+
+ protected void loadRightFromCache(TableCacheKey key) throws IOException {
+ ExecutionBlockSharedResource sharedResource = context.getSharedResource();
+ synchronized (sharedResource.getLock()) {
+ if (sharedResource.hasBroadcastCache(key)) {
+ CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+ this.tupleSlots = data.getData();
+ this.cachedRightTableStats = data.getTableStats();
+ } else {
+ CacheHolder.BroadcastCacheHolder holder =
+ new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null);
+ sharedResource.addBroadcastCache(key, holder);
+ CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key);
+ this.tupleSlots = data.getData();
+ this.cachedRightTableStats = data.getTableStats();
+ }
+ }
+ }
+
+ private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
Tuple tuple;
Tuple keyTuple;
+ Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
while (!context.isStopped() && (tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
@@ -210,16 +246,18 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
keyTuple.put(i, tuple.get(rightKeyList[i]));
}
- List<Tuple> newValue = tupleSlots.get(keyTuple);
+ List<Tuple> newValue = map.get(keyTuple);
+
if (newValue != null) {
newValue.add(tuple);
} else {
newValue = new ArrayList<Tuple>();
newValue.add(tuple);
- tupleSlots.put(keyTuple, newValue);
+ map.put(keyTuple, newValue);
}
}
- first = false;
+
+ return map;
}
@Override
@@ -250,5 +288,31 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
public JoinNode getPlan() {
return this.plan;
}
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftChild == null) {
+ return inputStats;
+ }
+ TableStats leftInputStats = leftChild.getInputStats();
+ inputStats.setNumBytes(0);
+ inputStats.setReadBytes(0);
+ inputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ inputStats.setNumBytes(leftInputStats.getNumBytes());
+ inputStats.setReadBytes(leftInputStats.getReadBytes());
+ inputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats;
+ if (rightInputStats != null) {
+ inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+ inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+ inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return inputStats;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 5692308..a1eaa48 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -34,7 +34,7 @@ import java.util.List;
/**
* A Scanner that reads multiple partitions
*/
-public class PartitionMergeScanExec extends PhysicalExec {
+public class PartitionMergeScanExec extends ScanExec {
private final ScanNode plan;
private SeqScanExec currentScanner = null;
@@ -56,14 +56,16 @@ public class PartitionMergeScanExec extends PhysicalExec {
inputStats = new TableStats();
}
+ @Override
public void init() throws IOException {
for (CatalogProtos.FragmentProto fragment : fragments) {
SeqScanExec scanExec = new SeqScanExec(context, (ScanNode) PlannerUtil.clone(null, plan),
- new CatalogProtos.FragmentProto[] {fragment});
+ new CatalogProtos.FragmentProto[]{fragment});
scanners.add(scanExec);
}
progress = 0.0f;
rescan();
+ super.init();
}
@Override
@@ -112,11 +114,22 @@ public class PartitionMergeScanExec extends PhysicalExec {
progress = 1.0f;
}
+ @Override
public String getTableName() {
return plan.getTableName();
}
@Override
+ public String getCanonicalName() {
+ return plan.getCanonicalName();
+ }
+
+ @Override
+ public CatalogProtos.FragmentProto[] getFragments() {
+ return fragments;
+ }
+
+ @Override
public float getProgress() {
if (iterator != null) {
float progressSum = 0.0f;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
new file mode 100644
index 0000000..86874ba
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class ScanExec extends PhysicalExec {
+
+ /* if this is a broadcasted table or not */
+ private boolean canBroadcast;
+
+ public ScanExec(TaskAttemptContext context, Schema inSchema, Schema outSchema) {
+ super(context, inSchema, outSchema);
+ }
+
+ public abstract String getTableName();
+
+ public abstract String getCanonicalName();
+
+ public abstract CatalogProtos.FragmentProto[] getFragments();
+
+ @Override
+ public void init() throws IOException {
+ canBroadcast = checkIfBroadcast();
+
+ super.init();
+ }
+
+ public boolean canBroadcast() {
+ return canBroadcast;
+ }
+
+ /* check if this scan is broadcasted */
+ private boolean checkIfBroadcast() {
+ Enforcer enforcer = context.getEnforcer();
+
+ if (enforcer != null && enforcer.hasEnforceProperty(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST)) {
+ List<TajoWorkerProtocol.EnforceProperty> properties =
+ enforcer.getEnforceProperties(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST);
+
+ for (TajoWorkerProtocol.EnforceProperty property : properties) {
+ if (getCanonicalName().equals(property.getBroadcast().getTableName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c62027d..1078c80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -21,17 +21,14 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.ConstEval;
import org.apache.tajo.plan.expr.EvalNode;
@@ -42,18 +39,16 @@ import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class SeqScanExec extends PhysicalExec {
+public class SeqScanExec extends ScanExec {
private ScanNode plan;
private Scanner scanner = null;
@@ -66,10 +61,6 @@ public class SeqScanExec extends PhysicalExec {
private TableStats inputStats;
- private TupleCacheKey cacheKey;
-
- private boolean cacheRead = false;
-
public SeqScanExec(TaskAttemptContext context, ScanNode plan,
CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -78,21 +69,6 @@ public class SeqScanExec extends PhysicalExec {
this.qual = plan.getQual();
this.fragments = fragments;
- if (plan.isBroadcastTable()) {
- String pathNameKey = "";
- if (fragments != null) {
- StringBuilder stringBuilder = new StringBuilder();
- for (FragmentProto f : fragments) {
- Fragment fragement = FragmentConvertor.convert(context.getConf(), f);
- stringBuilder.append(fragement.getKey());
- }
- pathNameKey = stringBuilder.toString();
- }
-
- cacheKey = new TupleCacheKey(
- context.getTaskId().getTaskId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
- }
-
if (fragments != null
&& plan.getTableDesc().hasPartition()
&& plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
@@ -153,6 +129,7 @@ public class SeqScanExec extends PhysicalExec {
}
}
+ @Override
public void init() throws IOException {
Schema projected;
@@ -177,33 +154,7 @@ public class SeqScanExec extends PhysicalExec {
projected = outSchema;
}
- if (cacheKey != null) {
- TupleCache tupleCache = TupleCache.getInstance();
- if (tupleCache.isBroadcastCacheReady(cacheKey)) {
- openCacheScanner();
- } else {
- if (TupleCache.getInstance().lockBroadcastScan(cacheKey)) {
- scanAndAddCache(projected);
- openCacheScanner();
- } else {
- Object lockMonitor = tupleCache.getLockMonitor();
- synchronized (lockMonitor) {
- try {
- lockMonitor.wait(20 * 1000);
- } catch (InterruptedException e) {
- }
- }
- if (tupleCache.isBroadcastCacheReady(cacheKey)) {
- openCacheScanner();
- } else {
- initScanner(projected);
- }
- }
- }
- } else {
- initScanner(projected);
- }
-
+ initScanner(projected);
super.init();
}
@@ -216,7 +167,7 @@ public class SeqScanExec extends PhysicalExec {
private void initScanner(Schema projected) throws IOException {
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
- TableMeta meta = null;
+ TableMeta meta;
try {
meta = (TableMeta) plan.getTableDesc().getMeta().clone();
} catch (CloneNotSupportedException e) {
@@ -241,35 +192,6 @@ public class SeqScanExec extends PhysicalExec {
}
}
- private void openCacheScanner() throws IOException {
- Scanner cacheScanner = TupleCache.getInstance().openCacheScanner(cacheKey, plan.getPhysicalSchema());
- if (cacheScanner != null) {
- scanner = cacheScanner;
- cacheRead = true;
- }
- }
-
- private void scanAndAddCache(Schema projected) throws IOException {
- initScanner(projected);
-
- List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
- while (!context.isStopped()) {
- Tuple tuple = next();
- if (tuple != null) {
- broadcastTupleCacheList.add(tuple);
- } else {
- break;
- }
- }
-
- if (scanner != null) {
- scanner.close();
- scanner = null;
- }
-
- TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList);
- }
-
@Override
public Tuple next() throws IOException {
if (fragments == null) {
@@ -281,9 +203,6 @@ public class SeqScanExec extends PhysicalExec {
if (!plan.hasQual()) {
if ((tuple = scanner.next()) != null) {
- if (cacheRead) {
- return tuple;
- }
projector.eval(tuple, outTuple);
outTuple.setOffset(tuple.getOffset());
return outTuple;
@@ -292,9 +211,6 @@ public class SeqScanExec extends PhysicalExec {
}
} else {
while ((tuple = scanner.next()) != null) {
- if (cacheRead) {
- return tuple;
- }
if (qual.eval(inSchema, tuple).isTrue()) {
projector.eval(tuple, outTuple);
return outTuple;
@@ -328,11 +244,22 @@ public class SeqScanExec extends PhysicalExec {
projector = null;
}
+ @Override
public String getTableName() {
return plan.getTableName();
}
@Override
+ public String getCanonicalName() {
+ return plan.getCanonicalName();
+ }
+
+ @Override
+ public CatalogProtos.FragmentProto[] getFragments() {
+ return fragments;
+ }
+
+ @Override
public float getProgress() {
if (scanner == null) {
return 1.0f;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
new file mode 100644
index 0000000..6a5c0bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import com.google.common.collect.Maps;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface CacheHolder<T> {
+
+ /**
+ * Get a shared data from the TableCache.
+ */
+ T getData();
+
+ /**
+ * Get a shared table stats from the TableCache.
+ */
+ TableStats getTableStats();
+
+ /**
+ * Release a cache to the memory.
+ *
+ */
+ void release();
+
+ /**
+ * This is a cache-holder for a join table
+ * It will release when execution block is finished
+ */
+ public static class BroadcastCacheHolder implements CacheHolder<Map<Tuple, List<Tuple>>> {
+ private Map<Tuple, List<Tuple>> data;
+ private Deallocatable rowBlock;
+ private TableStats tableStats;
+
+ public BroadcastCacheHolder(Map<Tuple, List<Tuple>> data, TableStats tableStats, Deallocatable rowBlock){
+ this.data = data;
+ this.tableStats = tableStats;
+ this.rowBlock = rowBlock;
+ }
+
+ @Override
+ public Map<Tuple, List<Tuple>> getData() {
+ return Maps.newHashMap(data);
+ }
+
+ @Override
+ public TableStats getTableStats(){
+ return tableStats;
+ }
+
+ @Override
+ public void release() {
+ if(rowBlock != null) rowBlock.release();
+ }
+
+ public static TableCacheKey getCacheKey(TaskAttemptContext ctx, String canonicalName,
+ CatalogProtos.FragmentProto[] fragments) throws IOException {
+ String pathNameKey = "";
+ if (fragments != null) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (CatalogProtos.FragmentProto f : fragments) {
+ Fragment fragement = FragmentConvertor.convert(ctx.getConf(), f);
+ stringBuilder.append(fragement.getKey());
+ }
+ pathNameKey = stringBuilder.toString();
+ }
+
+ return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), canonicalName, pathNameKey);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
new file mode 100644
index 0000000..f2a2217
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a simple TableCache which just added CacheHolder as needed.
+ */
+ public class TableCache {
+ public static final Log LOG = LogFactory.getLog(TableCache.class);
+
+ private static TableCache instance;
+ private Map<TableCacheKey, CacheHolder<?>> cacheMap = Maps.newHashMap();
+
+ private TableCache() {
+ }
+
+ public static synchronized TableCache getInstance() {
+ if (instance == null) {
+ instance = new TableCache();
+ }
+ return instance;
+ }
+
+ public synchronized void releaseCache(ExecutionBlockId ebId) {
+ if (ebId == null) {
+ return;
+ }
+
+ List<TableCacheKey> keys = getCacheKeyByExecutionBlockId(ebId);
+
+ for (TableCacheKey cacheKey: keys) {
+ cacheMap.remove(cacheKey).release();
+ LOG.info("Removed Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId);
+ }
+ }
+
+ public synchronized List<TableCacheKey> getCacheKeyByExecutionBlockId(ExecutionBlockId ebId) {
+ List<TableCacheKey> keys = Lists.newArrayList();
+ for (TableCacheKey eachKey : cacheMap.keySet()) {
+ if (eachKey.ebId.equals(ebId.toString())) {
+ keys.add(eachKey);
+ }
+ }
+ return keys;
+ }
+
+ public synchronized void addCache(TableCacheKey cacheKey, CacheHolder<?> cacheData) {
+ cacheMap.put(cacheKey, cacheData);
+ LOG.info("Added Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId);
+ }
+
+ public synchronized boolean hasCache(TableCacheKey cacheKey) {
+ return cacheMap.containsKey(cacheKey);
+ }
+
+ public synchronized CacheHolder<?> getCache(TableCacheKey cacheKey) {
+ return cacheMap.get(cacheKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
new file mode 100644
index 0000000..81a4b58
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+public class TableCacheKey {
+ String ebId;
+ String tableName;
+ String pathName;
+
+ public TableCacheKey(String ebId, String tableName, String pathName) {
+ this.ebId = ebId;
+ this.tableName = tableName;
+ this.pathName = pathName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ return toString().equals(o.toString());
+ }
+
+ @Override
+ public String toString() {
+ return ebId + "," + tableName + "," + pathName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
deleted file mode 100644
index 00647b5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TupleCache {
- private static TupleCache instance;
-
- private Map<TupleCacheKey, List<Tuple>> broadcastTupleCacheData
- = new HashMap<TupleCacheKey, List<Tuple>>();
- private Map<TupleCacheKey, TupleCacheStatus> broadcastTupleCacheStatus
- = new HashMap<TupleCacheKey, TupleCacheStatus>();
-
- private Object lockMonitor = new Object();
-
- public static enum TupleCacheStatus {
- STARTED,
- ENDED
- };
-
- private TupleCache() {
- }
-
- public static synchronized TupleCache getInstance() {
- if (instance == null) {
- instance = new TupleCache();
- }
- return instance;
- }
-
- public Object getLockMonitor() {
- return lockMonitor;
- }
-
- public void removeBroadcastCache(ExecutionBlockId ebId) {
- if (ebId == null) {
- return;
- }
- synchronized (lockMonitor) {
- TupleCacheKey matchedKey = null;
- for (TupleCacheKey eachKey: broadcastTupleCacheStatus.keySet()) {
- if (eachKey.ebId.equals(ebId.toString())) {
- matchedKey = eachKey;
- break;
- }
- }
- if (matchedKey != null) {
- broadcastTupleCacheStatus.remove(matchedKey);
- broadcastTupleCacheData.remove(matchedKey);
- }
- }
- }
-
- public void addBroadcastCache(TupleCacheKey cacheKey, List<Tuple> cacheData) {
- synchronized (lockMonitor) {
- if (broadcastTupleCacheStatus.containsKey(cacheKey) &&
- broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED) {
- return;
- }
- broadcastTupleCacheData.put(cacheKey, cacheData);
- broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.ENDED);
- lockMonitor.notifyAll();
- }
- }
-
- public boolean lockBroadcastScan(TupleCacheKey cacheKey) {
- synchronized (lockMonitor) {
- if (broadcastTupleCacheStatus.containsKey(cacheKey)) {
- return false;
- } else {
- broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.STARTED);
- return true;
- }
- }
- }
-
- public boolean isBroadcastCacheReady(TupleCacheKey cacheKey) {
- synchronized (lockMonitor) {
- if (!broadcastTupleCacheStatus.containsKey(cacheKey)) {
- return false;
- }
- return broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED;
- }
- }
-
- public TupleCacheScanner openCacheScanner(TupleCacheKey cacheKey, Schema schema) throws IOException {
- synchronized (lockMonitor) {
- List<Tuple> cacheData = broadcastTupleCacheData.get(cacheKey);
- if (cacheData != null) {
- TupleCacheScanner scanner = new TupleCacheScanner(cacheData, schema);
- scanner.init();
- return scanner;
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
deleted file mode 100644
index 1cb01c2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-public class TupleCacheKey {
- String ebId;
- String tableName;
- String pathName;
-
- public TupleCacheKey(String ebId, String tableName, String pathName) {
- this.ebId = ebId;
- this.tableName = tableName;
- this.pathName = pathName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- return toString().equals(o.toString());
- }
-
- @Override
- public String toString() {
- return ebId + "," + tableName + "," + pathName;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index b193b24..494fd7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -21,12 +21,16 @@ package org.apache.tajo.worker;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
import org.apache.tajo.engine.codegen.TajoClassLoader;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCache;
+import org.apache.tajo.engine.utils.TableCacheKey;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
@@ -38,6 +42,7 @@ public class ExecutionBlockSharedResource {
private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
private AtomicBoolean initializing = new AtomicBoolean(false);
private volatile Boolean resourceInitSuccess = Boolean.valueOf(false);
+ private final Object lock = new Object();
// Query
private QueryContext context;
@@ -108,6 +113,27 @@ public class ExecutionBlockSharedResource {
}
}
+ /* This is guarantee a lock for a ExecutionBlock */
+ public synchronized Object getLock() {
+ return lock;
+ }
+
+ public boolean hasBroadcastCache(TableCacheKey key) {
+ return TableCache.getInstance().hasCache(key);
+ }
+
+ public <T extends Object> CacheHolder<T> getBroadcastCache(TableCacheKey key) {
+ return (CacheHolder<T>) TableCache.getInstance().getCache(key);
+ }
+
+ public void addBroadcastCache(TableCacheKey cacheKey, CacheHolder<?> cacheData) {
+ TableCache.getInstance().addCache(cacheKey, cacheData);
+ }
+
+ public void releaseBroadcastCache(ExecutionBlockId id) {
+ TableCache.getInstance().releaseCache(id);
+ }
+
public void release() {
compilationContext = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 50cd20a..706e9b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 3f4a1b8..a375a31 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.worker.event.TaskRunnerEvent;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.apache.tajo.worker.event.TaskRunnerStopEvent;
@@ -184,7 +182,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId());
if(executionBlockContext != null){
try {
- TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
+ executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId());
executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
workerContext.getTaskHistoryWriter().flushTaskHistories();
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
new file mode 100644
index 0000000..f10f2a1
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.util;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.utils.CacheHolder;
+import org.apache.tajo.engine.utils.TableCache;
+import org.apache.tajo.engine.utils.TableCacheKey;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestTableCache {
+
+ @Test
+ public void testBroadcastTableCache() throws Exception {
+
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
+ QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
+
+ final TableCacheKey key = new TableCacheKey(ebId.toString(), "testBroadcastTableCache", "path");
+ final ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+
+ final int parallelCount = 30;
+ ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+ List<Future<CacheHolder<Long>>> tasks = new ArrayList<Future<CacheHolder<Long>>>();
+ for (int i = 0; i < parallelCount; i++) {
+ tasks.add(executor.submit(createTask(key, resource)));
+ }
+
+ long expected = tasks.get(0).get().getData().longValue();
+
+ for (Future<CacheHolder<Long>> future : tasks) {
+ assertEquals(expected, future.get().getData().longValue());
+ }
+
+ resource.releaseBroadcastCache(ebId);
+ assertFalse(resource.hasBroadcastCache(key));
+ executor.shutdown();
+ }
+
+ private Callable<CacheHolder<Long>> createTask(final TableCacheKey key, final ExecutionBlockSharedResource resource) {
+ return new Callable<CacheHolder<Long>>() {
+ @Override
+ public CacheHolder<Long> call() throws Exception {
+ CacheHolder<Long> result;
+ synchronized (resource.getLock()) {
+ if (!TableCache.getInstance().hasCache(key)) {
+ final long nanoTime = System.nanoTime();
+ final TableStats tableStats = new TableStats();
+ tableStats.setNumRows(100);
+ tableStats.setNumBytes(1000);
+
+ final CacheHolder<Long> cacheHolder = new CacheHolder<Long>() {
+
+ @Override
+ public Long getData() {
+ return nanoTime;
+ }
+
+ @Override
+ public TableStats getTableStats() {
+ return tableStats;
+ }
+
+ @Override
+ public void release() {
+
+ }
+ };
+
+ resource.addBroadcastCache(key, cacheHolder);
+ }
+ }
+
+ CacheHolder<?> holder = resource.getBroadcastCache(key);
+ result = (CacheHolder<Long>) holder;
+ return result;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
deleted file mode 100644
index 3d2f307..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.util;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertFalse;
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
-public class TestTupleCache {
- @Test
- public void testTupleCcaheBasicFunction() throws Exception {
- List<Tuple> tupleData = new ArrayList<Tuple>();
- for (int i = 0; i < 100; i++) {
- Datum[] datums = new Datum[5];
- for (int j = 0; j < 5; j++) {
- datums[j] = new TextDatum(i + "_" + j);
- }
- Tuple tuple = new VTuple(datums);
- tupleData.add(tuple);
- }
-
- ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
- QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
-
- TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable", "test");
- TupleCache tupleCache = TupleCache.getInstance();
-
- assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
- assertTrue(tupleCache.lockBroadcastScan(cacheKey));
- assertFalse(tupleCache.lockBroadcastScan(cacheKey));
-
- tupleCache.addBroadcastCache(cacheKey, tupleData);
- assertTrue(tupleCache.isBroadcastCacheReady(cacheKey));
-
- Scanner scanner = tupleCache.openCacheScanner(cacheKey, null);
- assertNotNull(scanner);
-
- int count = 0;
-
- while (true) {
- Tuple tuple = scanner.next();
- if (tuple == null) {
- break;
- }
-
- assertEquals(tupleData.get(count), tuple);
- count++;
- }
-
- assertEquals(tupleData.size(), count);
-
- tupleCache.removeBroadcastCache(ebId);
- assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));
- assertTrue(tupleCache.lockBroadcastScan(cacheKey));
-
- tupleCache.removeBroadcastCache(ebId);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 5cbed7e..3387157 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -416,6 +416,9 @@ public class LogicalNodeDeserializer {
scan.setQual(EvalNodeDeserializer.deserialize(context, scanProto.getQual()));
}
+ if(scanProto.hasBroadcast()){
+ scan.setBroadcastTable(scanProto.getBroadcast());
+ }
scan.setInSchema(convertSchema(protoNode.getInSchema()));
scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index 39a13ba..1bde955 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -431,6 +431,8 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
if (scan.hasQual()) {
scanBuilder.setQual(EvalNodeSerializer.serialize(scan.getQual()));
}
+
+ scanBuilder.setBroadcast(scan.isBroadcastTable());
return scanBuilder;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 3e4f07c..02f52ff 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -104,6 +104,7 @@ message ScanNode {
required bool existTargets = 3;
repeated Target targets = 4;
optional EvalNodeTree qual = 5;
+ optional bool broadcast = 6;
}
message PartitionScanSpec {