You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:48 UTC
[11/24] git commit: ispan
ispan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8621b682
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8621b682
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8621b682
Branch: refs/heads/diagnostics2
Commit: 8621b682cae0f8c8f58ff2e5b750544113bf52ee
Parents: 5472140
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun May 4 16:58:41 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 09:12:27 2014 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 10 +
.../exec/cache/AbstractDataSerializable.java | 33 ++
.../exec/cache/AbstractStreamSerializable.java | 34 ++
.../drill/exec/cache/CachedVectorContainer.java | 77 +++++
.../drill/exec/cache/DistributedCache.java | 4 +-
.../drill/exec/cache/DrillSerializable.java | 17 +-
.../cache/HCVectorAccessibleSerializer.java | 56 ----
.../org/apache/drill/exec/cache/HazelCache.java | 252 ---------------
.../exec/cache/JacksonDrillSerializable.java | 48 +--
.../org/apache/drill/exec/cache/LocalCache.java | 305 ------------------
.../cache/LoopedAbstractDrillSerializable.java | 80 +++++
.../apache/drill/exec/cache/ProtoBufImpl.java | 49 ---
.../apache/drill/exec/cache/ProtoBufWrap.java | 67 ----
.../org/apache/drill/exec/cache/ProtoMap.java | 52 ----
.../drill/exec/cache/ProtoSerializable.java | 65 ++++
.../cache/VectorAccessibleSerializable.java | 22 +-
.../hazel/HCVectorAccessibleSerializer.java | 58 ++++
.../drill/exec/cache/hazel/HazelCache.java | 258 ++++++++++++++++
.../drill/exec/cache/hazel/ProtoBufImpl.java | 49 +++
.../drill/exec/cache/hazel/ProtoBufWrap.java | 67 ++++
.../apache/drill/exec/cache/hazel/ProtoMap.java | 52 ++++
.../drill/exec/cache/infinispan/ICache.java | 266 ++++++++++++++++
.../infinispan/VAAdvancedExternalizer.java | 72 +++++
.../cache/infinispan/ZookeeperCacheStore.java | 66 ++++
.../drill/exec/cache/local/LocalCache.java | 309 +++++++++++++++++++
.../OrderedPartitionRecordBatch.java | 20 +-
.../org/apache/drill/exec/server/Drillbit.java | 2 +-
.../drill/exec/server/RemoteServiceSet.java | 17 +-
.../java/org/apache/drill/PlanningBase.java | 2 +-
.../java/org/apache/drill/exec/cache/ISpan.java | 94 ++++++
.../drill/exec/cache/TestVectorCache.java | 136 +++++---
.../drill/exec/store/TestOrphanSchema.java | 2 +-
.../drill/exec/store/ischema/OrphanSchema.java | 9 +-
33 files changed, 1750 insertions(+), 900 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index d693630..6d11614 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -224,6 +224,16 @@
<version>3.1.4</version>
</dependency>
<dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-core</artifactId>
+ <version>6.0.1.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-tree</artifactId>
+ <version>6.0.1.Final</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.6.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
new file mode 100644
index 0000000..f7b9eed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public abstract class AbstractDataSerializable extends LoopedAbstractDrillSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDataSerializable.class);
+
+ @Override
+ public abstract void read(DataInput input) throws IOException;
+
+ @Override
+ public abstract void write(DataOutput output) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
new file mode 100644
index 0000000..ef488d6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class);
+
+ @Override
+ public abstract void readFromStream(InputStream input) throws IOException;
+
+ @Override
+ public abstract void writeToStream(OutputStream output) throws IOException;
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
new file mode 100644
index 0000000..1447e28
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachedVectorContainer.class);
+
+ private final byte[] data;
+ private final BufferAllocator allocator;
+ private VectorContainer container;
+
+ public CachedVectorContainer(WritableBatch batch, BufferAllocator allocator) throws IOException {
+ VectorAccessibleSerializable va = new VectorAccessibleSerializable(batch, allocator);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ va.writeToStream(baos);
+ this.allocator = allocator;
+ this.data = baos.toByteArray();
+ va.clear();
+ }
+
+ public CachedVectorContainer(byte[] data, BufferAllocator allocator) {
+ this.data = data;
+ this.allocator = allocator;
+ }
+
+ private void construct() {
+ try {
+ VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+ va.readFromStream(new ByteArrayInputStream(data));
+ this.container = va.get();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ public VectorAccessible get() {
+ if (container == null) {
+ construct();
+ }
+ return container;
+ }
+
+ public void clear() {
+ container.clear();
+ container = null;
+ }
+
+ public byte[] getData(){
+ return data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index 65362e0..aa87162 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -17,14 +17,12 @@
*/
package org.apache.drill.exec.cache;
-import java.io.Closeable;
-
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-public interface DistributedCache extends Closeable{
+public interface DistributedCache extends AutoCloseable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
public void run() throws DrillbitStartupException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 4f266f7..21ed37c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,19 +17,20 @@
*/
package org.apache.drill.exec.cache;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
/**
* Classes that can be put in the Distributed Cache must implement this interface.
*/
-public interface DrillSerializable {
+public interface DrillSerializable extends Externalizable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
- public void readData(ObjectDataInput input) throws IOException;
+ public void read(DataInput input) throws IOException;
public void readFromStream(InputStream input) throws IOException;
- public void writeData(ObjectDataOutput output) throws IOException;
+ public void write(DataOutput output) throws IOException;
public void writeToStream(OutputStream output) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
deleted file mode 100644
index 0d5ba96..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
-
- private BufferAllocator allocator;
-
- public HCVectorAccessibleSerializer(BufferAllocator allocator) {
- this.allocator = allocator;
- }
-
- public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
- VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
- va.readFromStream(DataInputInputStream.constructInputStream(in));
- return va;
- }
-
- public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
- va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
- }
-
- public void destroy() {}
-
- public int getTypeId() {
- return 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
deleted file mode 100644
index 0149a57..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
-import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.SerializerConfig;
-import com.hazelcast.core.DuplicateInstanceNameException;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class HazelCache implements DistributedCache {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
-
- private final String instanceName;
- private HazelcastInstance instance;
- private ITopic<HWorkQueueStatus> workQueueLengths;
- private HandlePlan fragments;
- private Cache<WorkQueueStatus, Integer> endpoints;
- private BufferAllocator allocator;
- private DrillConfig config;
-
- public HazelCache(DrillConfig config, BufferAllocator allocator) {
- this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
- this.allocator = allocator;
- this.config = config;
- }
-
- private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
- SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
- c.getSerializationConfig().addSerializerConfig(sc);
- }
-
- @SuppressWarnings("rawtypes")
- private <T> void addJSer(Config c, SerializationDefinition d){
- SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
- c.getSerializationConfig().addSerializerConfig(sc);
- }
-
-
- private class Listener implements MessageListener<HWorkQueueStatus>{
-
- @Override
- public void onMessage(Message<HWorkQueueStatus> wrapped) {
- logger.debug("Received new queue length message.");
- endpoints.put(wrapped.getMessageObject().get(), 0);
- }
-
- }
-
- public void run() {
- Config c = new Config();
- addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
- addJSer(c, SerializationDefinition.OPTION);
- addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
-
- c.setInstanceName(instanceName);
- c.getGroupConfig().setName(instanceName);
- for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
- logger.debug("Adding interface: {}", s);
- c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
- }
-
- instance = getInstanceOrCreateNew(c);
- workQueueLengths = instance.getTopic("queue-length");
- fragments = new HandlePlan(instance);
- endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
- workQueueLengths.addMessageListener(new Listener());
- }
-
- private HazelcastInstance getInstanceOrCreateNew(Config c) {
- for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
- if (instance.getName().equals(this.instanceName))
- return instance;
- }
- try {
- return Hazelcast.newHazelcastInstance(c);
- } catch (DuplicateInstanceNameException e) {
- return getInstanceOrCreateNew(c);
- }
- }
-
-// @Override
-// public void updateLocalQueueLength(int length) {
-// workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
-// .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
-// }
-//
-// @Override
-// public List<WorkQueueStatus> getQueueLengths() {
-// return Lists.newArrayList(endpoints.asMap().keySet());
-// }
-
- @Override
- public void close() throws IOException {
- this.instance.getLifecycleService().shutdown();
- }
-
- @Override
- public PlanFragment getFragment(FragmentHandle handle) {
- return this.fragments.get(handle);
- }
-
- @Override
- public void storeFragment(PlanFragment fragment) {
- fragments.put(fragment.getHandle(), fragment);
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
- return new HCDistributedMultiMapImpl<V>(mmap, clazz);
- }
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- return getNamedMap(clazz.getName(), clazz);
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
- IMap<String, V> imap = this.instance.getMap(name);
- MapConfig myMapConfig = new MapConfig();
- myMapConfig.setBackupCount(0);
- myMapConfig.setReadBackupData(true);
- instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
- return new HCDistributedMapImpl<V>(imap);
- }
-
- @Override
- public Counter getCounter(String name) {
- return new HCCounterImpl(this.instance.getAtomicLong(name));
- }
-
-
-
- public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
- private final IMap<String, V> m;
-
- public HCDistributedMapImpl(IMap<String, V> m) {
- this.m = m;
- }
-
- public V get(String key) {
- return m.get(key);
- }
-
- public void put(String key, V value) {
- m.put(key, value);
- }
-
- public void putIfAbsent(String key, V value) {
- m.putIfAbsent(key, value);
- }
-
- public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
- m.putIfAbsent(key, value, ttl, timeunit);
-
- }
-
- @Override
- public Iterator<Entry<String, V>> iterator() {
- return m.entrySet().iterator();
- }
-
-
- }
-
- public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
- private com.hazelcast.core.MultiMap<String, V> mmap;
-
- public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
- this.mmap = mmap;
- }
-
- public Collection<V> get(String key) {
- List<V> list = Lists.newArrayList();
- for (V v : mmap.get(key)) {
- list.add(v);
- }
- return list;
- }
-
- @Override
- public void put(String key, V value) {
- mmap.put(key, value);
- }
- }
-
- public static class HCCounterImpl implements Counter {
- private IAtomicLong n;
-
- public HCCounterImpl(IAtomicLong n) {
- this.n = n;
- }
-
- public long get() {
- return n.get();
- }
-
- public long incrementAndGet() {
- return n.incrementAndGet();
- }
-
- public long decrementAndGet() {
- return n.decrementAndGet();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
index dcfc1ec..617c356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -17,47 +17,39 @@
*/
package org.apache.drill.exec.cache;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
-import java.io.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
-public abstract class JacksonDrillSerializable<T> implements DrillSerializable, DataSerializable{
+public abstract class JacksonDrillSerializable<T> extends LoopedAbstractDrillSerializable implements DrillSerializable{
private ObjectMapper mapper;
private T obj;
+ private Class<T> clazz;
- public JacksonDrillSerializable(DrillbitContext context, T obj) {
+ public JacksonDrillSerializable(DrillbitContext context, T obj, Class<T> clazz) {
+ this(clazz);
this.mapper = context.getConfig().getMapper();
this.obj = obj;
}
- public JacksonDrillSerializable() {
+ public JacksonDrillSerializable(Class<T> clazz) {
+ this.clazz = clazz;
}
@Override
- public void readData(ObjectDataInput input) throws IOException {
- readFromStream(DataInputInputStream.constructInputStream(input));
- }
-
- public void readFromStream(InputStream input, Class clazz) throws IOException {
+ public void readFromStream(InputStream input) throws IOException {
mapper = DrillConfig.create().getMapper();
obj = (T) mapper.readValue(input, clazz);
}
@Override
- public void writeData(ObjectDataOutput output) throws IOException {
- writeToStream(DataOutputOutputStream.constructOutputStream(output));
- }
-
- @Override
public void writeToStream(OutputStream output) throws IOException {
output.write(mapper.writeValueAsBytes(obj));
}
@@ -66,4 +58,20 @@ public abstract class JacksonDrillSerializable<T> implements DrillSerializable,
return obj;
}
+ public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
+
+ public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
+ super(context, obj, StoragePlugins.class);
+ }
+
+ public StoragePluginsSerializable(BufferAllocator allocator) {
+ super(StoragePlugins.class);
+ }
+
+ public StoragePluginsSerializable() {
+ super(StoragePlugins.class);
+ }
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
deleted file mode 100644
index 0fb4b82..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-public class LocalCache implements DistributedCache {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
-
- private volatile Map<FragmentHandle, PlanFragment> handles;
- private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
- private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
- private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
- private volatile ConcurrentMap<String, Counter> counters;
- private static final BufferAllocator allocator = new TopLevelAllocator();
-
- private static final ObjectMapper mapper = DrillConfig.create().getMapper();
-
- @Override
- public void close() throws IOException {
- handles = null;
- }
-
- @Override
- public void run() throws DrillbitStartupException {
- handles = Maps.newConcurrentMap();
- maps = Maps.newConcurrentMap();
- multiMaps = Maps.newConcurrentMap();
- counters = Maps.newConcurrentMap();
- namedMaps = Maps.newConcurrentMap();
- }
-
- @Override
- public PlanFragment getFragment(FragmentHandle handle) {
-// logger.debug("looking for fragment with handle: {}", handle);
- return handles.get(handle);
- }
-
- @Override
- public void storeFragment(PlanFragment fragment) {
-// logger.debug("Storing fragment: {}", fragment);
- handles.put(fragment.getHandle(), fragment);
- }
-
- @Override
- public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
- if (mmap == null) {
- multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
- return (DistributedMultiMap<V>) multiMaps.get(clazz);
- } else {
- return mmap;
- }
- }
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- DistributedMap m = maps.get(clazz);
- if (m == null) {
- maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
- return (DistributedMap<V>) maps.get(clazz);
- } else {
- return m;
- }
- }
-
-
- @Override
- public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
- DistributedMap m = namedMaps.get(clazz);
- if (m == null) {
- namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
- return (DistributedMap<V>) namedMaps.get(name);
- } else {
- return m;
- }
- }
-
- @Override
- public Counter getCounter(String name) {
- Counter c = counters.get(name);
- if (c == null) {
- counters.putIfAbsent(name, new LocalCounterImpl());
- return counters.get(name);
- } else {
- return c;
- }
- }
-
- public static ByteArrayDataOutput serialize(DrillSerializable obj) {
- if(obj instanceof JacksonSerializable){
- try{
- ByteArrayDataOutput out = ByteStreams.newDataOutput();
- out.write(mapper.writeValueAsBytes(obj));
- return out;
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
-
- ByteArrayDataOutput out = ByteStreams.newDataOutput();
- OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
- try {
- obj.writeToStream(outputStream);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- try {
- outputStream.flush();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return out;
- }
-
- public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
- if(JacksonSerializable.class.isAssignableFrom(clazz)){
- try{
- return (V) mapper.readValue(bytes, clazz);
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
-
- ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
- InputStream inputStream = DataInputInputStream.constructInputStream(in);
- try {
- V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
- obj.readFromStream(inputStream);
- return obj;
- } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
- private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
- private Class<V> clazz;
-
- public LocalDistributedMultiMapImpl(Class<V> clazz) {
- mmap = ArrayListMultimap.create();
- this.clazz = clazz;
- }
-
- @Override
- public Collection<V> get(String key) {
- List<V> list = Lists.newArrayList();
- for (ByteArrayDataOutput o : mmap.get(key)) {
- list.add(deserialize(o.toByteArray(), this.clazz));
- }
- return list;
- }
-
- @Override
- public void put(String key, DrillSerializable value) {
- mmap.put(key, serialize(value));
- }
- }
-
- public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
- protected ConcurrentMap<String, ByteArrayDataOutput> m;
- protected Class<V> clazz;
-
- public LocalDistributedMapImpl(Class<V> clazz) {
- m = Maps.newConcurrentMap();
- this.clazz = clazz;
- }
-
- @Override
- public V get(String key) {
- if (m.get(key) == null) return null;
- ByteArrayDataOutput b = m.get(key);
- byte[] bytes = b.toByteArray();
- return (V) deserialize(m.get(key).toByteArray(), this.clazz);
- }
-
- @Override
- public void put(String key, V value) {
- m.put(key, serialize(value));
- }
-
- @Override
- public void putIfAbsent(String key, V value) {
- m.putIfAbsent(key, serialize(value));
- }
-
- @Override
- public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
- m.putIfAbsent(key, serialize(value));
- logger.warn("Expiration not implemented in local map cache");
- }
-
- private class DeserializingTransformer implements Iterator<Map.Entry<String, V> >{
- private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
-
- public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>> inner) {
- super();
- this.inner = inner;
- }
-
- @Override
- public boolean hasNext() {
- return inner.hasNext();
- }
-
- @Override
- public Entry<String, V> next() {
- return newEntry(inner.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput> input) {
- return new Map.Entry<String, V>(){
-
- @Override
- public String getKey() {
- return input.getKey();
- }
-
- @Override
- public V getValue() {
- return deserialize(input.getValue().toByteArray(), clazz);
- }
-
- @Override
- public V setValue(V value) {
- throw new UnsupportedOperationException();
- }
-
- };
- }
-
- }
- @Override
- public Iterator<Entry<String, V>> iterator() {
- return new DeserializingTransformer(m.entrySet().iterator());
- }
- }
-
- public static class LocalCounterImpl implements Counter {
- private AtomicLong al = new AtomicLong();
-
- @Override
- public long get() {
- return al.get();
- }
-
- @Override
- public long incrementAndGet() {
- return al.incrementAndGet();
- }
-
- @Override
- public long decrementAndGet() {
- return al.decrementAndGet();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
new file mode 100644
index 0000000..1de030d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+
+/**
+ * Helper class that holds the basic functionality to interchangably use the different Drill serializble interfaces.
+ * This is package private as users should utilize either AbstractDataSerializable or AbstractStreamSerializable instead
+ * to avoid infinite loops.
+ */
+abstract class LoopedAbstractDrillSerializable implements DrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class);
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ writeToStream(baos);
+ byte[] ba = baos.toByteArray();
+ out.write(ba.length);
+ out.write(ba);
+ }
+
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ readFromStream(DataInputInputStream.constructInputStream(input));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ writeToStream(DataOutputOutputStream.constructOutputStream(output));
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ read(new DataInputStream(input));
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ write(new DataOutputStream(output));
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ int len = in.readInt();
+ byte[] bytes = new byte[len];
+ in.readFully(bytes);
+ readFromStream(new ByteArrayInputStream(bytes));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
deleted file mode 100644
index 53b2bfa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.hazelcast.core.HazelcastInstance;
-
-public class ProtoBufImpl {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
-
- public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
- public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
- public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
- }
-
- public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
- public HFragmentHandle() {super(FragmentHandle.PARSER);}
- public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
- }
-
- public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
- public HPlanFragment() {super(PlanFragment.PARSER);}
- public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
- }
-
- public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
- public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
- public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
- public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
deleted file mode 100644
index d55c133..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
-
- T value;
- final Parser<T> parser;
-
- public ProtoBufWrap(Parser<T> parser){
- this(null, parser);
- }
-
- public ProtoBufWrap(T value, Parser<T> parser){
- this.value = value;
- this.parser = parser;
- }
-
- @Override
- public void readData(ObjectDataInput arg0) throws IOException {
- int len = arg0.readShort();
- byte[] b = new byte[len];
- arg0.readFully(b);
- this.value = parser.parseFrom(b);
- }
-
- @Override
- public void writeData(ObjectDataOutput arg0) throws IOException {
- byte[] b = value.toByteArray();
- if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
- arg0.writeShort(b.length);
- arg0.write(b);
- }
-
- protected T get() {
- return value;
- }
-
- protected void set(T value) {
- this.value = value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
deleted file mode 100644
index 1de1c4e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-
-public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
-
- private IMap<HK, HV> hzMap;
-
- public ProtoMap(HazelcastInstance instance, String mapName){
- hzMap = instance.getMap(mapName);
- }
-
- public V get(K key){
- Preconditions.checkNotNull(key);
- HK hk = getNewKey(key);
- HV hv = hzMap.get(hk);
- if(hv == null) return null;
- return hv.get();
- }
-
- public V put(K key, V value){
- Preconditions.checkNotNull(key);
- Preconditions.checkNotNull(value);
- HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
- return oldValue == null ? null : oldValue.get();
- }
-
- public abstract HK getNewKey(K key);
- public abstract HV getNewValue(V key);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
new file mode 100644
index 0000000..1538a85
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+public abstract class ProtoSerializable<V extends Message> extends AbstractStreamSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoSerializable.class);
+
+ private Parser<V> protoParser;
+ private V obj;
+
+ ProtoSerializable(Parser<V> protoParser, V obj) {
+ super();
+ this.protoParser = protoParser;
+ this.obj = obj;
+ }
+
+ public V getObject(){
+ return obj;
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ obj = protoParser.parseFrom(input);
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ obj.writeTo(output);
+ }
+
+ public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{
+ public PlanFragmentSerializable(PlanFragment obj) {super(PlanFragment.PARSER, obj);}
+ public PlanFragmentSerializable(){this(null);}
+ }
+ public static class FragmentHandleSerializable extends ProtoSerializable<FragmentHandle>{
+ public FragmentHandleSerializable(FragmentHandle obj) {super(FragmentHandle.PARSER, obj);}
+ public FragmentHandleSerializable(){this(null);}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 073a8d5..bb3f527 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -19,18 +19,18 @@ package org.apache.drill.exec.cache;
import io.netty.buffer.ByteBuf;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
@@ -43,19 +43,17 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
/**
* A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
* from an InputStream and construct a new VectorContainer.
*/
-public class VectorAccessibleSerializable implements DrillSerializable {
+public class VectorAccessibleSerializable extends AbstractStreamSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
- private VectorAccessible va;
+ private VectorContainer va;
private WritableBatch batch;
private BufferAllocator allocator;
private int recordCount = -1;
@@ -91,10 +89,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
}
- @Override
- public void readData(ObjectDataInput input) throws IOException {
- readFromStream(DataInputInputStream.constructInputStream(input));
- }
/**
* Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
@@ -134,10 +128,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
va = container;
}
- @Override
- public void writeData(ObjectDataOutput output) throws IOException {
- writeToStream(DataOutputOutputStream.constructOutputStream(output));
- }
public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
@@ -208,7 +198,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
}
- public VectorAccessible get() {
+ public VectorContainer get() {
return va;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..bac2323
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+ private BufferAllocator allocator;
+
+ public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+ VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+ va.readFromStream(DataInputInputStream.constructInputStream(in));
+ return va;
+ }
+
+ public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+ va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+ }
+
+ public void destroy() {}
+
+ public int getTypeId() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
new file mode 100644
index 0000000..06518b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.Counter;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HWorkQueueStatus;
+import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializerConfig;
+import com.hazelcast.core.DuplicateInstanceNameException;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+public class HazelCache implements DistributedCache {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
+
+ private final String instanceName;
+ private HazelcastInstance instance;
+ private ITopic<HWorkQueueStatus> workQueueLengths;
+ private HandlePlan fragments;
+ private Cache<WorkQueueStatus, Integer> endpoints;
+ private BufferAllocator allocator;
+ private DrillConfig config;
+
+ public HazelCache(DrillConfig config, BufferAllocator allocator) {
+ this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+ this.allocator = allocator;
+ this.config = config;
+ }
+
+ private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
+ SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
+ c.getSerializationConfig().addSerializerConfig(sc);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private <T> void addJSer(Config c, SerializationDefinition d){
+ SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
+ c.getSerializationConfig().addSerializerConfig(sc);
+ }
+
+
+ private class Listener implements MessageListener<HWorkQueueStatus>{
+
+ @Override
+ public void onMessage(Message<HWorkQueueStatus> wrapped) {
+ logger.debug("Received new queue length message.");
+ endpoints.put(wrapped.getMessageObject().get(), 0);
+ }
+
+ }
+
+ public void run() {
+ Config c = new Config();
+ addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
+ addJSer(c, SerializationDefinition.OPTION);
+ addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
+
+ c.setInstanceName(instanceName);
+ c.getGroupConfig().setName(instanceName);
+ for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
+ logger.debug("Adding interface: {}", s);
+ c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
+ }
+
+ instance = getInstanceOrCreateNew(c);
+ workQueueLengths = instance.getTopic("queue-length");
+ fragments = new HandlePlan(instance);
+ endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
+ workQueueLengths.addMessageListener(new Listener());
+ }
+
+ private HazelcastInstance getInstanceOrCreateNew(Config c) {
+ for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
+ if (instance.getName().equals(this.instanceName))
+ return instance;
+ }
+ try {
+ return Hazelcast.newHazelcastInstance(c);
+ } catch (DuplicateInstanceNameException e) {
+ return getInstanceOrCreateNew(c);
+ }
+ }
+
+// @Override
+// public void updateLocalQueueLength(int length) {
+// workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
+// .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
+// }
+//
+// @Override
+// public List<WorkQueueStatus> getQueueLengths() {
+// return Lists.newArrayList(endpoints.asMap().keySet());
+// }
+
+ @Override
+ public void close() throws IOException {
+ this.instance.getLifecycleService().shutdown();
+ }
+
+ @Override
+ public PlanFragment getFragment(FragmentHandle handle) {
+ return this.fragments.get(handle);
+ }
+
+ @Override
+ public void storeFragment(PlanFragment fragment) {
+ fragments.put(fragment.getHandle(), fragment);
+ }
+
+
+ @Override
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+ com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
+ return new HCDistributedMultiMapImpl<V>(mmap, clazz);
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+ return getNamedMap(clazz.getName(), clazz);
+ }
+
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+ IMap<String, V> imap = this.instance.getMap(name);
+ MapConfig myMapConfig = new MapConfig();
+ myMapConfig.setBackupCount(0);
+ myMapConfig.setReadBackupData(true);
+ instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
+ return new HCDistributedMapImpl<V>(imap);
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ return new HCCounterImpl(this.instance.getAtomicLong(name));
+ }
+
+
+
+ public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
+ private final IMap<String, V> m;
+
+ public HCDistributedMapImpl(IMap<String, V> m) {
+ this.m = m;
+ }
+
+ public V get(String key) {
+ return m.get(key);
+ }
+
+ public void put(String key, V value) {
+ m.put(key, value);
+ }
+
+ public void putIfAbsent(String key, V value) {
+ m.putIfAbsent(key, value);
+ }
+
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
+ m.putIfAbsent(key, value, ttl, timeunit);
+
+ }
+
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ return m.entrySet().iterator();
+ }
+
+
+ }
+
+ public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
+ private com.hazelcast.core.MultiMap<String, V> mmap;
+
+ public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
+ this.mmap = mmap;
+ }
+
+ public Collection<V> get(String key) {
+ List<V> list = Lists.newArrayList();
+ for (V v : mmap.get(key)) {
+ list.add(v);
+ }
+ return list;
+ }
+
+ @Override
+ public void put(String key, V value) {
+ mmap.put(key, value);
+ }
+ }
+
+ public static class HCCounterImpl implements Counter {
+ private IAtomicLong n;
+
+ public HCCounterImpl(IAtomicLong n) {
+ this.n = n;
+ }
+
+ public long get() {
+ return n.get();
+ }
+
+ public long incrementAndGet() {
+ return n.incrementAndGet();
+ }
+
+ public long decrementAndGet() {
+ return n.decrementAndGet();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
new file mode 100644
index 0000000..d992aa7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.hazelcast.core.HazelcastInstance;
+
+public class ProtoBufImpl {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
+
+ public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
+ public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
+ public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
+ }
+
+ public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
+ public HFragmentHandle() {super(FragmentHandle.PARSER);}
+ public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
+ }
+
+ public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
+ public HPlanFragment() {super(PlanFragment.PARSER);}
+ public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
+ }
+
+ public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
+ public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
+ public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
+ public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
new file mode 100644
index 0000000..23a4e08
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import java.io.IOException;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
+public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
+
+ T value;
+ final Parser<T> parser;
+
+ public ProtoBufWrap(Parser<T> parser){
+ this(null, parser);
+ }
+
+ public ProtoBufWrap(T value, Parser<T> parser){
+ this.value = value;
+ this.parser = parser;
+ }
+
+ @Override
+ public void readData(ObjectDataInput arg0) throws IOException {
+ int len = arg0.readShort();
+ byte[] b = new byte[len];
+ arg0.readFully(b);
+ this.value = parser.parseFrom(b);
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput arg0) throws IOException {
+ byte[] b = value.toByteArray();
+ if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
+ arg0.writeShort(b.length);
+ arg0.write(b);
+ }
+
+ protected T get() {
+ return value;
+ }
+
+ protected void set(T value) {
+ this.value = value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
new file mode 100644
index 0000000..72d793a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
+
+ private IMap<HK, HV> hzMap;
+
+ public ProtoMap(HazelcastInstance instance, String mapName){
+ hzMap = instance.getMap(mapName);
+ }
+
+ public V get(K key){
+ Preconditions.checkNotNull(key);
+ HK hk = getNewKey(key);
+ HV hv = hzMap.get(hk);
+ if(hv == null) return null;
+ return hv.get();
+ }
+
+ public V put(K key, V value){
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+ HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
+ return oldValue == null ? null : oldValue.get();
+ }
+
+ public abstract HK getNewKey(K key);
+ public abstract HV getNewValue(V key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
new file mode 100644
index 0000000..b26be7d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.Counter;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
+import org.apache.drill.exec.cache.ProtoSerializable.PlanFragmentSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.infinispan.Cache;
+import org.infinispan.atomic.Delta;
+import org.infinispan.atomic.DeltaAware;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfiguration;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
+import org.jgroups.blocks.atomic.CounterService;
+import org.jgroups.fork.ForkChannel;
+import org.jgroups.protocols.COUNTER;
+import org.jgroups.protocols.FRAG2;
+import org.jgroups.stack.ProtocolStack;
+
+public class ICache implements DistributedCache{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
+
+ private EmbeddedCacheManager manager;
+ private ForkChannel cacheChannel;
+ private final CounterService counters;
+ private final Cache<FragmentHandleSerializable, PlanFragmentSerializable> fragments;
+
+ public ICache(DrillConfig config, BufferAllocator allocator) throws Exception {
+ String clusterName = config.getString(ExecConstants.SERVICE_NAME);
+ GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+ gcb.transport() //
+ .defaultTransport().clusterName(clusterName).build();
+ gcb.serialization() //
+ .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator));
+
+ GlobalConfiguration gc = gcb.build();
+ Configuration c = new ConfigurationBuilder() //
+ .clustering() //
+ .cacheMode(CacheMode.DIST_ASYNC) //
+ .storeAsBinary() //
+ .build();
+ this.manager = new DefaultCacheManager(gc, c);
+ JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
+ this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
+ this.fragments = manager.getCache(PlanFragment.class.getName());
+ this.counters = new CounterService(this.cacheChannel);
+ }
+
+ @Override
+ public void close() throws IOException {
+ manager.stop();
+ }
+
+ @Override
+ public void run() throws DrillbitStartupException {
+ try {
+ cacheChannel.connect("c1");
+ } catch (Exception e) {
+ throw new DrillbitStartupException("Failure while trying to set up JGroups.");
+ }
+ }
+
+ @Override
+ public PlanFragment getFragment(FragmentHandle handle) {
+ PlanFragmentSerializable pfs = fragments.get(new FragmentHandleSerializable(handle));
+ if(pfs == null) return null;
+ return pfs.getObject();
+ }
+
+ @Override
+ public void storeFragment(PlanFragment fragment) {
+ fragments.put(new FragmentHandleSerializable(fragment.getHandle()), new PlanFragmentSerializable(fragment));
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+ Cache<String, DeltaList<V>> cache = manager.getCache(clazz.getName());
+ return new IMulti<V>(cache, clazz);
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+ Cache<String, V> c = manager.getCache(clazz.getName());
+ return new IMap<V>(c);
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+ }
+
+ private class JGroupsCounter implements Counter{
+ final org.jgroups.blocks.atomic.Counter inner;
+
+ public JGroupsCounter(org.jgroups.blocks.atomic.Counter inner) {
+ super();
+ this.inner = inner;
+ }
+
+ @Override
+ public long get() {
+ return inner.get();
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return inner.incrementAndGet();
+ }
+
+ @Override
+ public long decrementAndGet() {
+ return inner.decrementAndGet();
+ }
+
+ }
+
+ private class IMap<V extends DrillSerializable> implements DistributedMap<V>{
+
+ private Cache<String, V> cache;
+
+
+ public IMap(Cache<String, V> cache) {
+ super();
+ this.cache = cache;
+ }
+
+ @Override
+ public V get(String key) {
+ return cache.get(key);
+ }
+
+ @Override
+ public void put(String key, V value) {
+ cache.put(key, value);
+ }
+
+ @Override
+ public void putIfAbsent(String key, V value) {
+ cache.putIfAbsent(key, value);
+ }
+
+ @Override
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
+ cache.putIfAbsent(key, value, ttl, timeUnit);
+ }
+
+ }
+
+ private class IMulti<V extends DrillSerializable> implements DistributedMultiMap<V>{
+
+ private Cache<String, DeltaList<V>> cache;
+ private Class<V> clazz;
+
+ public IMulti(Cache<String, DeltaList<V>> cache, Class<V> clazz) {
+ super();
+ this.cache = cache;
+ this.clazz = clazz;
+ }
+
+ @Override
+ public Collection<V> get(String key) {
+ return cache.get(key);
+ }
+
+ @Override
+ public void put(String key, V value) {
+ cache.put(key, new DeltaList<V>(value));
+// cache.getAdvancedCache().applyDelta(key, new DeltaList<V>(value), key);
+ }
+
+ }
+
+
+
+
+ private static class DeltaList<V extends DrillSerializable> extends LinkedList<V> implements DeltaAware, Delta{
+
+ /** The serialVersionUID */
+ private static final long serialVersionUID = 2176345973026460708L;
+
+ public DeltaList(Collection<? extends V> c) {
+ super(c);
+ }
+
+ public DeltaList(V obj) {
+ super();
+ add(obj);
+ }
+
+ @Override
+ public Delta delta() {
+ return new DeltaList<V>(this);
+ }
+
+ @Override
+ public void commit() {
+ this.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public DeltaAware merge(DeltaAware d) {
+ List<V> other = null;
+ if (d != null && d instanceof DeltaList) {
+ other = (List<V>) d;
+ for (V e : this) {
+ other.add(e);
+ }
+ return (DeltaAware) other;
+ } else {
+ return this;
+ }
+ }
+ }
+
+
+// public void run() {
+// Config c = new Config();
+// SerializerConfig sc = new SerializerConfig() //
+// .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
+// .setTypeClass(VectorAccessibleSerializable.class);
+// c.setInstanceName(instanceName);
+// c.getSerializationConfig().addSerializerConfig(sc);
+// instance = getInstanceOrCreateNew(c);
+// workQueueLengths = instance.getTopic("queue-length");
+// fragments = new HandlePlan(instance);
+// endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
+// workQueueLengths.addMessageListener(new Listener());
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
new file mode 100644
index 0000000..5f54f74
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+
+import org.apache.drill.exec.cache.CachedVectorContainer;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
+
+import com.google.common.collect.ImmutableSet;
+
+public class VAAdvancedExternalizer implements AdvancedExternalizer<CachedVectorContainer> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VAAdvancedExternalizer.class);
+
+ private BufferAllocator allocator;
+
+
+ public VAAdvancedExternalizer(BufferAllocator allocator) {
+ super();
+ this.allocator = allocator;
+ }
+
+ static final Set<Class<? extends CachedVectorContainer>> CLASSES = //
+ (Set<Class<? extends CachedVectorContainer>>) //
+ (Object) ImmutableSet.of(CachedVectorContainer.class);
+
+ @Override
+ public CachedVectorContainer readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] b = new byte[length];
+ in.read(b);
+ CachedVectorContainer va = new CachedVectorContainer(b, allocator);
+ return va;
+ }
+
+ @Override
+ public void writeObject(ObjectOutput out, CachedVectorContainer va) throws IOException {
+ out.writeInt(va.getData().length);
+ out.write(va.getData());
+ }
+
+ @Override
+ public Integer getId() {
+ // magic number for this class, assume drill uses 3001-3100.
+ return 3001;
+ }
+
+ @Override
+ public Set<Class<? extends CachedVectorContainer>> getTypeClasses() {
+ return CLASSES;
+ }
+}