You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:51:59 UTC
[14/61] [abbrv] git commit: Enable View persistence,
Storage Plugin and System option persistence.
Enable View persistence, Storage Plugin and System option persistence.
Conflicts:
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/872d0abf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/872d0abf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/872d0abf
Branch: refs/heads/master
Commit: 872d0abf5218ea7da6a22f42a5917e8a88447e26
Parents: 37a4776
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 29 16:44:32 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jun 8 19:13:04 2014 -0700
----------------------------------------------------------------------
.../drill/common/expression/parser/ExprParser.g | 2 +
.../drill/common/logical/PlanProperties.java | 13 +-
.../resources/bootstrap-storage-plugins.json | 11 +
.../src/test/resources/storage-plugins.json | 11 -
.../resources/bootstrap-storage-plugins.json | 70 +
distribution/src/resources/storage-plugins.json | 70 -
exec/java-exec/pom.xml | 20 +-
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../exec/cache/AbstractDataSerializable.java | 33 -
.../drill/exec/cache/DistributedCache.java | 165 +-
.../apache/drill/exec/cache/DistributedMap.java | 13 +-
.../exec/cache/DistributedMapDeserializer.java | 28 -
.../drill/exec/cache/DistributedMultiMap.java | 6 +-
.../exec/cache/JacksonAdvancedSerializer.java | 65 -
.../exec/cache/JacksonDrillSerializable.java | 77 -
.../drill/exec/cache/JacksonSerializable.java | 68 -
.../drill/exec/cache/ProtoSerializable.java | 90 -
.../exec/cache/ProtobufDrillSerializable.java | 73 -
.../exec/cache/SerializationDefinition.java | 7 +-
.../cache/VectorAccessibleSerializable.java | 2 +-
.../hazel/HCVectorAccessibleSerializer.java | 58 -
.../drill/exec/cache/hazel/HazelCache.java | 260 --
.../drill/exec/cache/hazel/ProtoBufImpl.java | 49 -
.../drill/exec/cache/hazel/ProtoBufWrap.java | 67 -
.../apache/drill/exec/cache/hazel/ProtoMap.java | 52 -
.../drill/exec/cache/infinispan/ICache.java | 172 +-
.../drill/exec/cache/local/LocalCache.java | 271 +-
.../apache/drill/exec/client/DrillClient.java | 2 +-
.../drill/exec/client/QuerySubmitter.java | 2 +-
.../drill/exec/coord/ClusterCoordinator.java | 6 +-
.../drill/exec/coord/DistributedSemaphore.java | 26 +
.../exec/coord/DrillServiceInstanceHelper.java | 13 +-
.../exec/coord/LocalClusterCoordinator.java | 96 -
.../drill/exec/coord/ZKClusterCoordinator.java | 198 --
.../drill/exec/coord/ZKRegistrationHandle.java | 32 -
.../coord/local/LocalClusterCoordinator.java | 139 +
.../exec/coord/zk/ZKClusterCoordinator.java | 208 ++
.../exec/coord/zk/ZKRegistrationHandle.java | 33 +
.../exec/coord/zk/ZkDistributedSemaphore.java | 61 +
.../drill/exec/dotdrill/DotDrillFile.java | 58 +
.../drill/exec/dotdrill/DotDrillType.java | 52 +
.../drill/exec/dotdrill/DotDrillUtil.java | 61 +
.../org/apache/drill/exec/dotdrill/View.java | 146 +
.../org/apache/drill/exec/ops/QueryContext.java | 10 +-
.../OrderedPartitionRecordBatch.java | 23 +-
.../exec/physical/impl/xsort/BatchGroup.java | 2 +-
.../exec/planner/logical/DrillViewTable.java | 53 +-
.../exec/planner/logical/StoragePlugins.java | 3 +-
.../planner/sql/ExpandingConcurrentMap.java | 23 +-
.../exec/planner/sql/handlers/ViewHandler.java | 19 +-
.../apache/drill/exec/record/WritableBatch.java | 4 +-
.../drill/exec/rpc/control/WorkEventBus.java | 15 +-
.../apache/drill/exec/rpc/user/UserSession.java | 3 -
.../apache/drill/exec/rpc/user/ViewStore.java | 93 -
.../org/apache/drill/exec/server/Drillbit.java | 16 +-
.../drill/exec/server/DrillbitContext.java | 12 +-
.../drill/exec/server/RemoteServiceSet.java | 4 +-
.../drill/exec/server/options/OptionValue.java | 8 +-
.../server/options/SystemOptionManager.java | 39 +-
.../drill/exec/server/rest/DrillRoot.java | 14 +-
.../drill/exec/store/StoragePluginRegistry.java | 102 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 8 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 127 +-
.../org/apache/drill/exec/store/sys/PTable.java | 27 +
.../drill/exec/store/sys/PTableConfig.java | 77 +
.../drill/exec/store/sys/TableProvider.java | 26 +
.../drill/exec/store/sys/local/LocalTable.java | 179 +
.../store/sys/local/LocalTableProvider.java | 61 +
.../exec/store/sys/local/NoWriteLocalTable.java | 63 +
.../store/sys/serialize/JacksonSerializer.java | 46 +
.../store/sys/serialize/PClassSerializer.java | 25 +
.../store/sys/serialize/ProtoSerializer.java | 55 +
.../drill/exec/store/sys/zk/ZkPTable.java | 182 ++
.../exec/store/sys/zk/ZkTableProvider.java | 50 +
.../org/apache/drill/exec/work/WorkManager.java | 5 +-
.../exec/work/batch/SpoolingRawBatchBuffer.java | 1 +
.../apache/drill/exec/work/foreman/Foreman.java | 12 +-
.../drill/exec/work/foreman/QueryStatus.java | 14 +-
.../src/main/resources/drill-module.conf | 12 +-
.../java/org/apache/drill/PlanningBase.java | 8 +-
.../apache/drill/exec/DrillSystemTestBase.java | 44 +-
.../apache/drill/exec/TestWithZookeeper.java | 81 +
.../java/org/apache/drill/exec/cache/ISpan.java | 94 -
.../exec/cache/TestCacheSerialization.java | 68 +-
.../exec/client/DrillClientSystemTest.java | 6 +-
.../exec/physical/impl/TestOptiqPlans.java | 3 +-
.../drill/exec/store/TestOrphanSchema.java | 85 -
.../drill/exec/store/sys/PTableTestUtil.java | 66 +
.../exec/store/sys/TestTableProviders.java | 58 +
.../resources/bootstrap-storage-plugins.json | 54 +
.../src/test/resources/drill-module.conf | 2 +-
.../src/test/resources/storage-plugins.json | 54 -
.../apache/drill/jdbc/test/TestJdbcQuery.java | 9 +-
.../resources/bootstrap-storage-plugins.json | 54 +
.../src/test/resources/storage-plugins.json | 54 -
pom.xml | 2 +-
.../drill/exec/proto/BitControlHandshake.java | 206 ++
.../org/apache/drill/exec/proto/BitStatus.java | 174 +
.../apache/drill/exec/proto/FragmentStatus.java | 187 ++
.../apache/drill/exec/proto/PlanFragment.java | 466 +++
.../org/apache/drill/exec/proto/RpcType.java | 65 +
.../drill/exec/proto/WorkQueueStatus.java | 206 ++
protocol/output | 3076 ++++++++++++++++++
protocol/pom.xml | 93 +-
.../org/apache/drill/common/types/DataMode.java | 51 +
.../apache/drill/common/types/MajorType.java | 273 ++
.../apache/drill/common/types/MinorType.java | 117 +
.../drill/common/types/SchemaTypeProtos.java | 173 +
.../org/apache/drill/exec/proto/BitControl.java | 60 +-
.../org/apache/drill/exec/proto/ExecProtos.java | 643 +++-
.../drill/exec/proto/SchemaBitControl.java | 733 +++++
.../apache/drill/exec/proto/SchemaBitData.java | 415 +++
.../exec/proto/SchemaCoordinationProtos.java | 434 +++
.../drill/exec/proto/SchemaExecProtos.java | 272 ++
.../exec/proto/SchemaGeneralRPCProtos.java | 524 +++
.../drill/exec/proto/SchemaSchemaDefProtos.java | 26 +
.../drill/exec/proto/SchemaUserBitShared.java | 1801 ++++++++++
.../drill/exec/proto/SchemaUserProtos.java | 1064 ++++++
.../apache/drill/exec/proto/UserBitShared.java | 190 +-
.../org/apache/drill/exec/proto/beans/Ack.java | 163 +
.../exec/proto/beans/BitClientHandshake.java | 209 ++
.../exec/proto/beans/BitControlHandshake.java | 209 ++
.../exec/proto/beans/BitServerHandshake.java | 163 +
.../drill/exec/proto/beans/BitStatus.java | 175 +
.../exec/proto/beans/BitToUserHandshake.java | 163 +
.../exec/proto/beans/CompleteRpcMessage.java | 210 ++
.../exec/proto/beans/CoreOperatorType.java | 107 +
.../drill/exec/proto/beans/DrillPBError.java | 265 ++
.../exec/proto/beans/DrillServiceInstance.java | 209 ++
.../exec/proto/beans/DrillbitEndpoint.java | 253 ++
.../drill/exec/proto/beans/FragmentHandle.java | 209 ++
.../exec/proto/beans/FragmentRecordBatch.java | 278 ++
.../drill/exec/proto/beans/FragmentState.java | 57 +
.../drill/exec/proto/beans/FragmentStatus.java | 189 ++
.../exec/proto/beans/MajorFragmentProfile.java | 197 ++
.../drill/exec/proto/beans/MetricValue.java | 207 ++
.../exec/proto/beans/MinorFragmentProfile.java | 355 ++
.../apache/drill/exec/proto/beans/NamePart.java | 237 ++
.../drill/exec/proto/beans/NodeStatus.java | 185 ++
.../drill/exec/proto/beans/OperatorProfile.java | 317 ++
.../drill/exec/proto/beans/ParsingError.java | 229 ++
.../drill/exec/proto/beans/PlanFragment.java | 481 +++
.../apache/drill/exec/proto/beans/Property.java | 199 ++
.../apache/drill/exec/proto/beans/QueryId.java | 185 ++
.../drill/exec/proto/beans/QueryProfile.java | 309 ++
.../drill/exec/proto/beans/QueryResult.java | 445 +++
.../exec/proto/beans/QueryResultsMode.java | 47 +
.../drill/exec/proto/beans/QueryType.java | 51 +
.../drill/exec/proto/beans/RecordBatchDef.java | 219 ++
.../drill/exec/proto/beans/RequestResults.java | 187 ++
.../apache/drill/exec/proto/beans/Roles.java | 256 ++
.../drill/exec/proto/beans/RpcChannel.java | 51 +
.../drill/exec/proto/beans/RpcFailure.java | 229 ++
.../drill/exec/proto/beans/RpcHeader.java | 207 ++
.../apache/drill/exec/proto/beans/RpcMode.java | 51 +
.../apache/drill/exec/proto/beans/RpcType.java | 65 +
.../apache/drill/exec/proto/beans/RunQuery.java | 207 ++
.../drill/exec/proto/beans/SerializedField.java | 311 ++
.../drill/exec/proto/beans/StreamProfile.java | 207 ++
.../drill/exec/proto/beans/UserCredentials.java | 163 +
.../drill/exec/proto/beans/UserProperties.java | 175 +
.../exec/proto/beans/UserToBitHandshake.java | 255 ++
.../drill/exec/proto/beans/ValueMode.java | 51 +
.../drill/exec/proto/beans/ViewPointer.java | 185 ++
.../drill/exec/proto/beans/WorkQueueStatus.java | 209 ++
protocol/src/main/protobuf/BitControl.proto | 2 +-
.../src/main/protobuf/ExecutionProtos.proto | 6 +
protocol/src/main/protobuf/UserBitShared.proto | 2 +-
168 files changed, 23271 insertions(+), 2362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
----------------------------------------------------------------------
diff --git a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
index 9737e5d..ba6d48b 100644
--- a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
+++ b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
@@ -39,6 +39,8 @@ import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.PathSegment.ArraySegment;
import org.apache.drill.common.types.*;
import org.apache.drill.common.types.TypeProtos.*;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.exceptions.ExpressionParsingException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index a2942b6..e8fdebf 100644
--- a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -33,6 +33,7 @@ public class PlanProperties {
public Generator generator;
public ResultMode resultMode;
public JSONOptions options;
+ public int queue;
// @JsonInclude(Include.NON_NULL)
public static class Generator{
@@ -53,9 +54,11 @@ public class PlanProperties {
@JsonProperty("generator") Generator generator,
@JsonProperty("type") PlanType type,
@JsonProperty("mode") ResultMode resultMode,
- @JsonProperty("options") JSONOptions options
+ @JsonProperty("options") JSONOptions options,
+ @JsonProperty("queue") int queue
) {
this.version = version;
+ this.queue = queue;
this.generator = generator;
this.type = type;
this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
@@ -72,6 +75,7 @@ public class PlanProperties {
private PlanType type;
private ResultMode mode = ResultMode.EXEC;
private JSONOptions options;
+ private int queueNumber = 0;
public PlanPropertiesBuilder type(PlanType type) {
this.type = type;
@@ -93,6 +97,11 @@ public class PlanProperties {
return this;
}
+ public PlanPropertiesBuilder queue(int queueNumber){
+ this.queueNumber = queueNumber;
+ return this;
+ }
+
public PlanPropertiesBuilder options(JSONOptions options){
this.options = options;
return this;
@@ -104,7 +113,7 @@ public class PlanProperties {
}
public PlanProperties build() {
- return new PlanProperties(version, generator, type, mode, options);
+ return new PlanProperties(version, generator, type, mode, options, queueNumber);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..0e93f7e
--- /dev/null
+++ b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+ "storage":{
+ hbase : {
+ type:"hbase",
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/contrib/storage-hbase/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/storage-plugins.json b/contrib/storage-hbase/src/test/resources/storage-plugins.json
deleted file mode 100644
index 0e93f7e..0000000
--- a/contrib/storage-hbase/src/test/resources/storage-plugins.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "storage":{
- hbase : {
- type:"hbase",
- config : {
- "hbase.zookeeper.quorum" : "localhost",
- "hbase.zookeeper.property.clientPort" : 2181
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/distribution/src/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/bootstrap-storage-plugins.json b/distribution/src/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..3b1cbd0
--- /dev/null
+++ b/distribution/src/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,70 @@
+{
+ "storage":{
+ dfs: {
+ type: "file",
+ connection: "file:///",
+ workspaces: {
+ "root" : {
+ location: "/",
+ writable: false
+ },
+ "tmp" : {
+ location: "/tmp",
+ writable: true,
+ storageformat: "csv"
+ }
+ },
+ formats: {
+ "psv" : {
+ type: "text",
+ extensions: [ "tbl" ],
+ delimiter: "|"
+ },
+ "csv" : {
+ type: "text",
+ extensions: [ "csv" ],
+ delimiter: ","
+ },
+ "tsv" : {
+ type: "text",
+ extensions: [ "tsv" ],
+ delimiter: "\t"
+ },
+ "parquet" : {
+ type: "parquet"
+ },
+ "json" : {
+ type: "json"
+ }
+ }
+ },
+ cp: {
+ type: "file",
+ connection: "classpath:///"
+ }
+
+ /*,
+ hive : {
+ type:"hive",
+ config :
+ {
+ "hive.metastore.uris" : "",
+ "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
+ "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
+ "fs.default.name" : "file:///",
+ "hive.metastore.sasl.enabled" : "false"
+ }
+ }
+ */
+
+ /*,
+ hbase : {
+ type:"hbase",
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
+ }
+ */
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
deleted file mode 100644
index 3b1cbd0..0000000
--- a/distribution/src/resources/storage-plugins.json
+++ /dev/null
@@ -1,70 +0,0 @@
-{
- "storage":{
- dfs: {
- type: "file",
- connection: "file:///",
- workspaces: {
- "root" : {
- location: "/",
- writable: false
- },
- "tmp" : {
- location: "/tmp",
- writable: true,
- storageformat: "csv"
- }
- },
- formats: {
- "psv" : {
- type: "text",
- extensions: [ "tbl" ],
- delimiter: "|"
- },
- "csv" : {
- type: "text",
- extensions: [ "csv" ],
- delimiter: ","
- },
- "tsv" : {
- type: "text",
- extensions: [ "tsv" ],
- delimiter: "\t"
- },
- "parquet" : {
- type: "parquet"
- },
- "json" : {
- type: "json"
- }
- }
- },
- cp: {
- type: "file",
- connection: "classpath:///"
- }
-
- /*,
- hive : {
- type:"hive",
- config :
- {
- "hive.metastore.uris" : "",
- "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
- "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
- "fs.default.name" : "file:///",
- "hive.metastore.sasl.enabled" : "false"
- }
- }
- */
-
- /*,
- hbase : {
- type:"hbase",
- config : {
- "hbase.zookeeper.quorum" : "localhost",
- "hbase.zookeeper.property.clientPort" : 2181
- }
- }
- */
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 83220ab..ff6f552 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -92,10 +92,10 @@
<version>2.8</version>
</dependency>
<dependency>
- <groupId>org.glassfish.jersey.ext</groupId>
- <artifactId>jersey-mvc-freemarker</artifactId>
- <version>2.8</version>
- </dependency>
+ <groupId>org.glassfish.jersey.ext</groupId>
+ <artifactId>jersey-mvc-freemarker</artifactId>
+ <version>2.8</version>
+ </dependency>
<dependency>
<groupId>net.hydromatic</groupId>
<artifactId>optiq-core</artifactId>
@@ -190,14 +190,10 @@
<version>1.30</version>
</dependency>
<dependency>
- <groupId>com.netflix.curator</groupId>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
- <version>1.1.9</version>
+ <version>2.5.0</version>
<exclusions>
- <!-- <exclusion> -->
- <!-- <artifactId>netty</artifactId> -->
- <!-- <groupId>org.jboss.netty</groupId> -->
- <!-- </exclusion> -->
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
@@ -231,12 +227,12 @@
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
- <version>6.0.1.Final</version>
+ <version>6.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-tree</artifactId>
- <version>6.0.1.Final</version>
+ <version>6.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1ece198..5063f63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -74,5 +74,7 @@ public interface ExecConstants {
public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
public static final String HTTP_ENABLE = "drill.exec.http.enabled";
public static final String HTTP_PORT = "drill.exec.http.port";
+ public static final String SYS_TABLES_LOCAL_PATH = "drill.exec.sys.tables.local.path";
+ public static final String SYS_TABLES_LOCAL_ENABLE_WRITE = "drill.exec.sys.tables.local.write";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
deleted file mode 100644
index f7b9eed..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public abstract class AbstractDataSerializable extends LoopedAbstractDrillSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDataSerializable.class);
-
- @Override
- public abstract void read(DataInput input) throws IOException;
-
- @Override
- public abstract void write(DataOutput output) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index aa87162..1c5de85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -17,9 +17,11 @@
*/
package org.apache.drill.exec.cache;
+import java.util.Map;
+
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.protobuf.Message;
public interface DistributedCache extends AutoCloseable{
@@ -27,13 +29,158 @@ public interface DistributedCache extends AutoCloseable{
public void run() throws DrillbitStartupException;
-// public void updateLocalQueueLength(int length);
-// public List<WorkQueueStatus> getQueueLengths();
+ public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config);
+ public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config);
- public PlanFragment getFragment(FragmentHandle handle);
- public void storeFragment(PlanFragment fragment);
- public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz);
- public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz);
- public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz);
public Counter getCounter(String name);
+
+ public static enum SerializationMode {
+ JACKSON(Object.class),
+ DRILL_SERIALIZIABLE(String.class, DrillSerializable.class),
+ PROTOBUF(String.class, Message.class);
+
+ private final Class<?>[] classes;
+ private SerializationMode(Class<?>... classes){
+ this.classes = classes;
+ }
+
+ public void checkClass(Class<?> classToCheck){
+ for(Class<?> c : classes){
+ if(c.isAssignableFrom(classToCheck)) return;
+ }
+
+ throw new UnsupportedOperationException(String.format("You are trying to serialize the class %s using the serialization mode %s. This is not allowed.", classToCheck.getName(), this.name()));
+ }
+ }
+
+ public static class CacheConfig<K, V>{
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final String name;
+ private final SerializationMode mode;
+
+ public CacheConfig(Class<K> keyClass, Class<V> valueClass, String name, SerializationMode mode) {
+ super();
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.name = name;
+ this.mode = mode;
+ }
+
+ public Class<K> getKeyClass() {
+ return keyClass;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ public SerializationMode getMode() {
+ return mode;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((keyClass == null) ? 0 : keyClass.hashCode());
+ result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((valueClass == null) ? 0 : valueClass.hashCode());
+ return result;
+ }
+
+ public static <V> CacheConfigBuilder<String, V> newBuilder(Class<V> valueClass) {
+ return newBuilder(String.class, valueClass);
+ }
+
+ public static <K, V> CacheConfigBuilder<K, V> newBuilder(Class<K> keyClass, Class<V> valueClass) {
+ return new CacheConfigBuilder<K, V>(keyClass, valueClass);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CacheConfig other = (CacheConfig) obj;
+ if (keyClass == null) {
+ if (other.keyClass != null)
+ return false;
+ } else if (!keyClass.equals(other.keyClass))
+ return false;
+ if (mode != other.mode)
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (valueClass == null) {
+ if (other.valueClass != null)
+ return false;
+ } else if (!valueClass.equals(other.valueClass))
+ return false;
+ return true;
+ }
+
+
+ }
+
+ public static class CacheConfigBuilder<K, V> {
+
+ private Class<K> keyClass;
+ private Class<V> valueClass;
+ private String name;
+ private SerializationMode mode = SerializationMode.DRILL_SERIALIZIABLE;
+
+ private CacheConfigBuilder(Class<K> keyClass, Class<V> valueClass) {
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.name = keyClass.getName();
+ }
+
+
+ public CacheConfigBuilder<K, V> mode(SerializationMode mode){
+ this.mode = mode;
+ return this;
+ }
+
+ public CacheConfigBuilder<K, V> proto(){
+ this.mode = SerializationMode.PROTOBUF;
+ return this;
+ }
+
+ public CacheConfigBuilder<K, V> jackson(){
+ this.mode = SerializationMode.JACKSON;
+ return this;
+ }
+
+ public CacheConfigBuilder<K, V> drill(){
+ this.mode = SerializationMode.DRILL_SERIALIZIABLE;
+ return this;
+ }
+
+
+ public CacheConfigBuilder<K, V> name(String name){
+ this.name = name;
+ return this;
+ }
+
+ public CacheConfig<K, V> build(){
+ mode.checkClass(keyClass);
+ mode.checkClass(valueClass);
+ return new CacheConfig<K, V>(keyClass, valueClass, name, mode);
+ }
+
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 1bee5fc..2411434 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -20,10 +20,13 @@ package org.apache.drill.exec.cache;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public interface DistributedMap<V extends DrillSerializable> extends Iterable<Map.Entry<String, V>>{
+public interface DistributedMap<K, V>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
- public V get(String key);
- public void put(String key, V value);
- public void putIfAbsent(String key, V value);
- public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit);
+ public V get(K key);
+ public void put(K key, V value);
+ public void delete(K key);
+ public void putIfAbsent(K key, V value);
+ public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
+ public Iterable<Map.Entry<K, V>> getLocalEntries();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
deleted file mode 100644
index 453dbfb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-public interface DistributedMapDeserializer<V extends DrillSerializable> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMapDeserializer.class);
-
- public V put(String key, byte[] value) throws IOException;
-
- public Class getValueClass();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
index 645fb7c..bf06646 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.cache;
import java.util.Collection;
-public interface DistributedMultiMap<V extends DrillSerializable> {
+public interface DistributedMultiMap<K, V> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
- public Collection<V> get(String key);
- public void put(String key, V value);
+ public Collection<V> get(K key);
+ public void put(K key, V value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
deleted file mode 100644
index edf31aa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-import org.apache.drill.common.util.DataInputInputStream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class JacksonAdvancedSerializer<T> implements StreamSerializer<T> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedSerializer.class);
-
- private final Class<?> clazz;
- private final ObjectMapper mapper;
- private final int id;
-
- public JacksonAdvancedSerializer(SerializationDefinition def, ObjectMapper mapper){
- this.clazz = def.clazz;
- this.mapper = mapper;
- this.id = def.id;
- }
-
- @Override
- public int getTypeId() {
- return id;
- }
-
- @Override
- public void destroy() {
- }
-
- @Override
- public void write(ObjectDataOutput out, T object) throws IOException {
- out.write(mapper.writeValueAsBytes(object));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T read(ObjectDataInput in) throws IOException {
- return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
deleted file mode 100644
index 617c356..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.planner.logical.StoragePlugins;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public abstract class JacksonDrillSerializable<T> extends LoopedAbstractDrillSerializable implements DrillSerializable{
- private ObjectMapper mapper;
- private T obj;
- private Class<T> clazz;
-
- public JacksonDrillSerializable(DrillbitContext context, T obj, Class<T> clazz) {
- this(clazz);
- this.mapper = context.getConfig().getMapper();
- this.obj = obj;
- }
-
- public JacksonDrillSerializable(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- mapper = DrillConfig.create().getMapper();
- obj = (T) mapper.readValue(input, clazz);
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- output.write(mapper.writeValueAsBytes(obj));
- }
-
- public T getObj() {
- return obj;
- }
-
- public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
-
- public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
- super(context, obj, StoragePlugins.class);
- }
-
- public StoragePluginsSerializable(BufferAllocator allocator) {
- super(StoragePlugins.class);
- }
-
- public StoragePluginsSerializable() {
- super(StoragePlugins.class);
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
deleted file mode 100644
index 247c79e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.OutputStream;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-
-public abstract class JacksonSerializable implements DrillSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializable.class);
-
- private void fail(){
- throw new UnsupportedOperationException("Need to register serialization config for class " + this.getClass().getName()); // rely on external serializer
-
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- fail();
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- fail();
- }
-
- @Override
- public void read(DataInput input) throws IOException {
- fail();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- fail();
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- fail();
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- fail();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
deleted file mode 100644
index f48aae1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public abstract class ProtoSerializable<V extends Message> extends AbstractStreamSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoSerializable.class);
-
- private Parser<V> protoParser;
- private V obj;
-
- ProtoSerializable(Parser<V> protoParser, V obj) {
- super();
- this.protoParser = protoParser;
- this.obj = obj;
- }
-
- public V getObject(){
- return obj;
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- obj = protoParser.parseDelimitedFrom(input);
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- obj.writeDelimitedTo(output);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((obj == null) ? 0 : obj.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ProtoSerializable other = (ProtoSerializable) obj;
- if (this.obj == null) {
- if (other.obj != null)
- return false;
- } else if (!this.obj.equals(other.obj))
- return false;
- return true;
- }
-
- public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{
- public PlanFragmentSerializable(PlanFragment obj) {super(PlanFragment.PARSER, obj);}
- public PlanFragmentSerializable(){this(null);}
- }
- public static class FragmentHandleSerializable extends ProtoSerializable<FragmentHandle>{
- public FragmentHandleSerializable(FragmentHandle obj) {super(FragmentHandle.PARSER, obj);}
- public FragmentHandleSerializable(){this(null);}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
deleted file mode 100644
index 9d6b645..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public abstract class ProtobufDrillSerializable<T extends Message> extends LoopedAbstractDrillSerializable implements DrillSerializable{
- private Parser<T> parser;
- private T obj;
-
- public ProtobufDrillSerializable(T obj){
- this.parser = (Parser<T>) obj.getParserForType();
- this.obj = obj;
- }
-
- public ProtobufDrillSerializable(Parser<T> parser) {
- this.parser = parser;
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- obj = parser.parseDelimitedFrom(input);
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- obj.writeDelimitedTo(output);
- }
-
- public T getObj() {
- return obj;
- }
-
- public static class CQueryProfile extends ProtobufDrillSerializable<QueryProfile>{
-
- public CQueryProfile(BufferAllocator allocator){
- super(QueryProfile.PARSER);
- }
-
- public CQueryProfile() {
- super(QueryProfile.PARSER);
-
- }
-
- public CQueryProfile(QueryProfile obj) {
- super(obj);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
index 711ddf1..a5df73a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
@@ -19,14 +19,19 @@ package org.apache.drill.exec.cache;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.server.options.OptionValue;
public enum SerializationDefinition {
OPTION(3002, OptionValue.class),
STORAGE_PLUGINS(3003, StoragePlugins.class),
- FRAGMENT_STATUS(3004, FragmentStatus.class)
+ FRAGMENT_STATUS(3004, FragmentStatus.class),
+ FRAGMENT_HANDLE(3005, FragmentHandle.class),
+ PLAN_FRAGMENT(3006, PlanFragment.class)
;
+
public final int id;
public final Class<?> clazz;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 63ed592..cff56d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -98,7 +98,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
VectorContainer container = new VectorContainer();
UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
- if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+ if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
if (sv2 == null) {
sv2 = new SelectionVector2(allocator);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
deleted file mode 100644
index bac2323..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
-
- private BufferAllocator allocator;
-
- public HCVectorAccessibleSerializer(BufferAllocator allocator) {
- this.allocator = allocator;
- }
-
- public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
- VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
- va.readFromStream(DataInputInputStream.constructInputStream(in));
- return va;
- }
-
- public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
- va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
- }
-
- public void destroy() {}
-
- public int getTypeId() {
- return 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
deleted file mode 100644
index f83456b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.Counter;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.DrillSerializable;
-import org.apache.drill.exec.cache.JacksonAdvancedSerializer;
-import org.apache.drill.exec.cache.SerializationDefinition;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HWorkQueueStatus;
-import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HandlePlan;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.SerializerConfig;
-import com.hazelcast.core.DuplicateInstanceNameException;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class HazelCache implements DistributedCache {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
-
- private final String instanceName;
- private HazelcastInstance instance;
- private ITopic<HWorkQueueStatus> workQueueLengths;
- private HandlePlan fragments;
- private Cache<WorkQueueStatus, Integer> endpoints;
- private BufferAllocator allocator;
- private DrillConfig config;
-
- public HazelCache(DrillConfig config, BufferAllocator allocator) {
- this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
- this.allocator = allocator;
- this.config = config;
- }
-
- private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
- SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
- c.getSerializationConfig().addSerializerConfig(sc);
- }
-
- @SuppressWarnings("rawtypes")
- private <T> void addJSer(Config c, SerializationDefinition d){
- SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
- c.getSerializationConfig().addSerializerConfig(sc);
- }
-
-
- private class Listener implements MessageListener<HWorkQueueStatus>{
-
- @Override
- public void onMessage(Message<HWorkQueueStatus> wrapped) {
- logger.debug("Received new queue length message.");
- endpoints.put(wrapped.getMessageObject().get(), 0);
- }
-
- }
-
- public void run() {
- Config c = new Config();
- addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
- addJSer(c, SerializationDefinition.OPTION);
- addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
-
- c.setInstanceName(instanceName);
- c.getGroupConfig().setName(instanceName);
- for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
- logger.debug("Adding interface: {}", s);
- c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
- }
-
- instance = getInstanceOrCreateNew(c);
- workQueueLengths = instance.getTopic("queue-length");
- fragments = new HandlePlan(instance);
- endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
- workQueueLengths.addMessageListener(new Listener());
- }
-
- private HazelcastInstance getInstanceOrCreateNew(Config c) {
- for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
- if (instance.getName().equals(this.instanceName))
- return instance;
- }
- try {
- return Hazelcast.newHazelcastInstance(c);
- } catch (DuplicateInstanceNameException e) {
- return getInstanceOrCreateNew(c);
- }
- }
-
-// @Override
-// public void updateLocalQueueLength(int length) {
-// workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
-// .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
-// }
-//
-// @Override
-// public List<WorkQueueStatus> getQueueLengths() {
-// return Lists.newArrayList(endpoints.asMap().keySet());
-// }
-
- @Override
- public void close() throws IOException {
- this.instance.getLifecycleService().shutdown();
- }
-
- @Override
- public PlanFragment getFragment(FragmentHandle handle) {
- return this.fragments.get(handle);
- }
-
- @Override
- public void storeFragment(PlanFragment fragment) {
- fragments.put(fragment.getHandle(), fragment);
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
- return new HCDistributedMultiMapImpl<V>(mmap, clazz);
- }
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- return getNamedMap(clazz.getName(), clazz);
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
- IMap<String, V> imap = this.instance.getMap(name);
- MapConfig myMapConfig = new MapConfig();
- myMapConfig.setBackupCount(0);
- myMapConfig.setReadBackupData(true);
- instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
- return new HCDistributedMapImpl<V>(imap);
- }
-
- @Override
- public Counter getCounter(String name) {
- return new HCCounterImpl(this.instance.getAtomicLong(name));
- }
-
-
-
- public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
- private final IMap<String, V> m;
-
- public HCDistributedMapImpl(IMap<String, V> m) {
- this.m = m;
- }
-
- public V get(String key) {
- return m.get(key);
- }
-
- public void put(String key, V value) {
- m.put(key, value);
- }
-
- public void putIfAbsent(String key, V value) {
- m.putIfAbsent(key, value);
- }
-
- public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
- m.putIfAbsent(key, value, ttl, timeunit);
-
- }
-
- @Override
- public Iterator<Entry<String, V>> iterator() {
- return m.entrySet().iterator();
- }
-
-
- }
-
- public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
- private com.hazelcast.core.MultiMap<String, V> mmap;
-
- public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
- this.mmap = mmap;
- }
-
- public Collection<V> get(String key) {
- List<V> list = Lists.newArrayList();
- for (V v : mmap.get(key)) {
- list.add(v);
- }
- return list;
- }
-
- @Override
- public void put(String key, V value) {
- mmap.put(key, value);
- }
- }
-
- public static class HCCounterImpl implements Counter {
- private IAtomicLong n;
-
- public HCCounterImpl(IAtomicLong n) {
- this.n = n;
- }
-
- public long get() {
- return n.get();
- }
-
- public long incrementAndGet() {
- return n.incrementAndGet();
- }
-
- public long decrementAndGet() {
- return n.decrementAndGet();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
deleted file mode 100644
index d992aa7..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.hazelcast.core.HazelcastInstance;
-
-public class ProtoBufImpl {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
-
- public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
- public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
- public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
- }
-
- public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
- public HFragmentHandle() {super(FragmentHandle.PARSER);}
- public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
- }
-
- public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
- public HPlanFragment() {super(PlanFragment.PARSER);}
- public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
- }
-
- public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
- public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
- public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
- public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
deleted file mode 100644
index 23a4e08..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import java.io.IOException;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
-
- T value;
- final Parser<T> parser;
-
- public ProtoBufWrap(Parser<T> parser){
- this(null, parser);
- }
-
- public ProtoBufWrap(T value, Parser<T> parser){
- this.value = value;
- this.parser = parser;
- }
-
- @Override
- public void readData(ObjectDataInput arg0) throws IOException {
- int len = arg0.readShort();
- byte[] b = new byte[len];
- arg0.readFully(b);
- this.value = parser.parseFrom(b);
- }
-
- @Override
- public void writeData(ObjectDataOutput arg0) throws IOException {
- byte[] b = value.toByteArray();
- if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
- arg0.writeShort(b.length);
- arg0.write(b);
- }
-
- protected T get() {
- return value;
- }
-
- protected void set(T value) {
- this.value = value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
deleted file mode 100644
index 72d793a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-
-public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
-
- private IMap<HK, HV> hzMap;
-
- public ProtoMap(HazelcastInstance instance, String mapName){
- hzMap = instance.getMap(mapName);
- }
-
- public V get(K key){
- Preconditions.checkNotNull(key);
- HK hk = getNewKey(key);
- HV hv = hzMap.get(hk);
- if(hv == null) return null;
- return hv.get();
- }
-
- public V put(K key, V value){
- Preconditions.checkNotNull(key);
- Preconditions.checkNotNull(value);
- HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
- return oldValue == null ? null : oldValue.get();
- }
-
- public abstract HK getNewKey(K key);
- public abstract HV getNewValue(V key);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index f56f19a..fb346ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -19,10 +19,10 @@ package org.apache.drill.exec.cache.infinispan;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.config.DrillConfig;
@@ -31,10 +31,8 @@ import org.apache.drill.exec.cache.Counter;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.DistributedMap;
import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.DrillSerializable;
import org.apache.drill.exec.cache.SerializationDefinition;
-import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
-import org.apache.drill.exec.cache.ProtoSerializable.PlanFragmentSerializable;
+import org.apache.drill.exec.cache.local.LocalCache.LocalCounterImpl;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -46,7 +44,6 @@ import org.infinispan.atomic.DeltaAware;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
@@ -57,41 +54,65 @@ import org.jgroups.protocols.COUNTER;
import org.jgroups.protocols.FRAG2;
import org.jgroups.stack.ProtocolStack;
+import com.google.hive12.common.collect.Maps;
+
public class ICache implements DistributedCache{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
private EmbeddedCacheManager manager;
private ForkChannel cacheChannel;
private final CounterService counters;
- private final Cache<FragmentHandleSerializable, PlanFragmentSerializable> fragments;
+ private final boolean local;
+ private volatile ConcurrentMap<String, Counter> localCounters;
- public ICache(DrillConfig config, BufferAllocator allocator) throws Exception {
+ public ICache(DrillConfig config, BufferAllocator allocator, boolean local) throws Exception {
String clusterName = config.getString(ExecConstants.SERVICE_NAME);
- GlobalConfiguration gc = new GlobalConfigurationBuilder() //
- .transport() //
+ this.local = local;
+
+ final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_ASYNC;
+ GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+ if(!local){
+ gcb.transport() //
.defaultTransport() //
- .clusterName(clusterName) //;
- //
- .serialization() //
+ .clusterName(clusterName);
+ }
+
+ gcb.serialization() //
.addAdvancedExternalizer(new VAAdvancedExternalizer(allocator)) //
.addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.OPTION, config.getMapper())) //
.addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.STORAGE_PLUGINS, config.getMapper())) //
.addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_STATUS, FragmentStatus.PARSER)) //
+ .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_HANDLE, FragmentHandle.PARSER)) //
+ .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.PLAN_FRAGMENT, PlanFragment.PARSER)) //
.build();
Configuration c = new ConfigurationBuilder() //
.clustering() //
- .cacheMode(CacheMode.DIST_ASYNC) //
+ .cacheMode(mode) //
.storeAsBinary().enable() //
.build();
- this.manager = new DefaultCacheManager(gc, c);
- JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
- this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
- this.fragments = manager.getCache(PlanFragment.class.getName());
- this.counters = new CounterService(this.cacheChannel);
+ this.manager = new DefaultCacheManager(gcb.build(), c);
+
+ if(!local){
+ JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
+ this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
+ this.counters = new CounterService(this.cacheChannel);
+ }else{
+ this.cacheChannel = null;
+ this.counters = null;
+ }
}
+
+// @Override
+// public <K, V> Map<K, V> getSmallAtomicMap(CacheConfig<K, V> config) {
+// Cache<String, ?> cache = manager.getCache("atomic-maps");
+// return AtomicMapLookup.getAtomicMap(cache, config.getName());
+// }
+
+
@Override
public void close() throws IOException {
manager.stop();
@@ -100,45 +121,45 @@ public class ICache implements DistributedCache{
@Override
public void run() throws DrillbitStartupException {
try {
- cacheChannel.connect("c1");
+ if(local){
+ localCounters = Maps.newConcurrentMap();
+ manager.start();
+ }else{
+ cacheChannel.connect("c1");
+ }
+
} catch (Exception e) {
throw new DrillbitStartupException("Failure while trying to set up JGroups.");
}
}
@Override
- public PlanFragment getFragment(FragmentHandle handle) {
- PlanFragmentSerializable pfs = fragments.get(new FragmentHandleSerializable(handle));
- if(pfs == null) return null;
- return pfs.getObject();
- }
-
- @Override
- public void storeFragment(PlanFragment fragment) {
- fragments.put(new FragmentHandleSerializable(fragment.getHandle()), new PlanFragmentSerializable(fragment));
+ public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config) {
+ Cache<K, DeltaList<V>> cache = manager.getCache(config.getName());
+ return new IMulti<K, V>(cache, config);
}
@Override
- public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- Cache<String, DeltaList<V>> cache = manager.getCache(clazz.getName());
- return new IMulti<V>(cache, clazz);
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
- Cache<String, V> c = manager.getCache(name);
- return new IMap<V>(c);
- }
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- return getNamedMap(clazz.getName(), clazz);
+ public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) {
+ Cache<K, V> c = manager.getCache(config.getName());
+ return new IMap<K, V>(c, config);
}
@Override
public Counter getCounter(String name) {
- return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+ if(local){
+ Counter c = localCounters.get(name);
+ if (c == null) {
+ localCounters.putIfAbsent(name, new LocalCounterImpl());
+ return localCounters.get(name);
+ } else {
+ return c;
+ }
+
+ }else{
+ return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+ }
+
}
private class JGroupsCounter implements Counter{
@@ -166,63 +187,68 @@ public class ICache implements DistributedCache{
}
- private class IMap<V extends DrillSerializable> implements DistributedMap<V>{
+ private class IMap<K, V> implements DistributedMap<K, V>{
- private Cache<String, V> cache;
+ private Cache<K, V> cache;
+ private CacheConfig<K, V> config;
-
- public IMap(Cache<String, V> cache) {
+ public IMap(Cache<K, V> cache, CacheConfig<K, V> config) {
super();
this.cache = cache;
+ this.config = config;
+ }
+
+ @Override
+ public Iterable<Entry<K, V>> getLocalEntries() {
+ return cache.entrySet();
}
@Override
- public V get(String key) {
+ public V get(K key) {
return cache.get(key);
}
@Override
- public void put(String key, V value) {
- cache.put(key, value);
+ public void delete(K key) {
+ cache.remove(key);
}
@Override
- public void putIfAbsent(String key, V value) {
- cache.putIfAbsent(key, value);
+ public void put(K key, V value) {
+ cache.put(key, value);
}
@Override
- public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
- cache.putIfAbsent(key, value, ttl, timeUnit);
+ public void putIfAbsent(K key, V value) {
+ cache.putIfAbsent(key, value);
}
@Override
- public Iterator<Entry<String, V>> iterator() {
- return cache.entrySet().iterator();
+ public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+ cache.putIfAbsent(key, value, ttl, timeUnit);
}
}
- private class IMulti<V extends DrillSerializable> implements DistributedMultiMap<V>{
+ private class IMulti<K, V> implements DistributedMultiMap<K, V>{
- private Cache<String, DeltaList<V>> cache;
- private Class<V> clazz;
+ private Cache<K, DeltaList<V>> cache;
+ private CacheConfig<K, V> config;
- public IMulti(Cache<String, DeltaList<V>> cache, Class<V> clazz) {
+ public IMulti(Cache<K, DeltaList<V>> cache, CacheConfig<K, V> config) {
super();
this.cache = cache;
- this.clazz = clazz;
+ this.config = config;
}
@Override
- public Collection<V> get(String key) {
+ public Collection<V> get(K key) {
return cache.get(key);
}
@Override
- public void put(String key, V value) {
+ public void put(K key, V value) {
cache.put(key, new DeltaList<V>(value));
-// cache.getAdvancedCache().applyDelta(key, new DeltaList<V>(value), key);
}
}
@@ -230,7 +256,7 @@ public class ICache implements DistributedCache{
- private static class DeltaList<V extends DrillSerializable> extends LinkedList<V> implements DeltaAware, Delta{
+ private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta{
/** The serialVersionUID */
private static final long serialVersionUID = 2176345973026460708L;
@@ -270,18 +296,4 @@ public class ICache implements DistributedCache{
}
}
-
-// public void run() {
-// Config c = new Config();
-// SerializerConfig sc = new SerializerConfig() //
-// .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
-// .setTypeClass(VectorAccessibleSerializable.class);
-// c.setInstanceName(instanceName);
-// c.getSerializationConfig().addSerializerConfig(sc);
-// instance = getInstanceOrCreateNew(c);
-// workQueueLengths = instance.getTopic("queue-length");
-// fragments = new HandlePlan(instance);
-// endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
-// workQueueLengths.addMessageListener(new Listener());
-// }
}