You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/23 08:09:16 UTC
[2/3] git commit: Several changes: - added base system test capacle
of bootin a drillbit cluster - added a client sys test that submits a plan -
corrected a bug in ZKClusterCoordinator where endpoints would not get fetched
before a cacheChanged event - c
Several changes:
- added base system test capacle of bootin a drillbit cluster
- added a client sys test that submits a plan
- corrected a bug in ZKClusterCoordinator where endpoints would not get fetched before a cacheChanged event
- corrected a bug in HazelCache where it wouldn't support fetching an existing HazelCastInstance
- added a mini zookeeper cluster based on HBase's one but without the hadoop dependency
- made sure all tests pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/31fb6eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/31fb6eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/31fb6eb0
Branch: refs/heads/execwork
Commit: 31fb6eb0124ac8801e27b32bf54ff7a34ce7d85a
Parents: 4f3a1c6
Author: David Ribeiro Alves <da...@gmail.com>
Authored: Thu Apr 18 22:03:39 2013 -0500
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Thu Apr 18 22:03:39 2013 -0500
----------------------------------------------------------------------
.../apache/drill/common/config/DrillConfig.java | 12 +-
.../apache/drill/common/config/NestedConfig.java | 16 +-
.../org/apache/drill/exec/cache/HazelCache.java | 11 +-
.../org/apache/drill/exec/client/DrillClient.java | 100 ++++
.../drill/exec/coord/ClusterCoordinator.java | 14 +-
.../drill/exec/coord/ZKClusterCoordinator.java | 137 +++---
.../org/apache/drill/exec/server/Drillbit.java | 20 +-
.../apache/drill/exec/server/DrillbitContext.java | 3 +-
.../java-exec/src/main/resources/drill-module.conf | 1 +
.../org/apache/drill/exec/DrillSystemTestBase.java | 77 +++-
.../drill/exec/client/DrillClientSystemTest.java | 42 ++-
.../drill/exec/util/MiniZooKeeperCluster.java | 368 +++++++++++++++
.../java-exec/src/test/resources/drill-module.conf | 28 ++
.../java-exec/src/test/resources/simple_plan.json | 133 ++++++
14 files changed, 861 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index b738002..dc9327e 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.media.jfxmedia.events.VideoTrackSizeListener;
import org.apache.drill.common.exceptions.DrillConfigurationException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.StorageEngineConfigBase;
@@ -44,8 +46,9 @@ public final class DrillConfig extends NestedConfig{
@SuppressWarnings("unchecked")
private volatile List<Queue<Object>> sinkQueues = new CopyOnWriteArrayList<Queue<Object>>(new Queue[1]);
-
- private DrillConfig(Config config) {
+
+ @VisibleForTesting
+ public DrillConfig(Config config) {
super(config);
mapper = new ObjectMapper();
SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule").addDeserializer(LogicalExpression.class, new LogicalExpression.De(this));
@@ -145,4 +148,9 @@ public final class DrillConfig extends NestedConfig{
public ObjectMapper getMapper(){
return mapper;
}
+
+ @Override
+ public String toString(){
+ return this.root().render();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
index b8d6133..6954707 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
@@ -17,18 +17,12 @@
******************************************************************************/
package org.apache.drill.common.config;
+import com.typesafe.config.*;
+
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigList;
-import com.typesafe.config.ConfigMergeable;
-import com.typesafe.config.ConfigObject;
-import com.typesafe.config.ConfigOrigin;
-import com.typesafe.config.ConfigResolveOptions;
-import com.typesafe.config.ConfigValue;
-
abstract class NestedConfig implements Config {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class);
@@ -187,15 +181,15 @@ abstract class NestedConfig implements Config {
}
public Config atPath(String path) {
- return null;
+ return c.atPath(path);
}
public Config atKey(String key) {
- return null;
+ return c.atKey(key);
}
public Config withValue(String path, ConfigValue value) {
- return null;
+ return c.withValue(path, value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index cc73799..b7477a3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -65,9 +65,8 @@ public class HazelCache implements DistributedCache {
public void run(DrillbitEndpoint endpoint) {
Config c = new Config();
- // todo, utilize cluster member ship to set up other nodes.
c.setInstanceName(instanceName);
- instance = Hazelcast.newHazelcastInstance(c);
+ instance = getInstanceOrCreateNew(c);
workQueueLengths = instance.getTopic("queue-length");
optimizedPlans = instance.getMap("plan-optimizations");
this.endpoint = endpoint;
@@ -75,6 +74,14 @@ public class HazelCache implements DistributedCache {
workQueueLengths.addMessageListener(new Listener());
}
+ private HazelcastInstance getInstanceOrCreateNew(Config c) {
+ for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
+ if (instance.getName().equals(this.instanceName))
+ return instance;
+ }
+ return Hazelcast.newHazelcastInstance(c);
+ }
+
@Override
public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) {
optimizedPlans.put(logical, physical);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
new file mode 100644
index 0000000..e0f3347
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -0,0 +1,100 @@
+package org.apache.drill.exec.client;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.UserClient;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.get;
+import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
+import static io.netty.buffer.Unpooled.copiedBuffer;
+import static java.nio.charset.Charset.forName;
+import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import static org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
+import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
+
+/**
+ * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
+ */
+public class DrillClient {
+
+ DrillConfig config;
+ private UserClient client;
+ private ClusterCoordinator clusterCoordinator;
+
+ public DrillClient() {
+ this(DrillConfig.create());
+ }
+
+ public DrillClient(String fileName) {
+ this(DrillConfig.create(fileName));
+ }
+
+ public DrillClient(DrillConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Connects the client to a Drillbit server
+ *
+ * @throws IOException
+ */
+ public void connect() throws Exception {
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+ this.clusterCoordinator.start();
+ Thread.sleep(10000);
+ Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints();
+ checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
+ // just use the first endpoint for now
+ DrillbitEndpoint endpoint = get(endpoints, 0);
+ ByteBufAllocator bb = new PooledByteBufAllocator(true);
+ this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+ try {
+ this.client.connectAsClient(endpoint.getAddress(), endpoint.getUserPort());
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Closes this client's connection to the server
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ this.client.close();
+ }
+
+ /**
+ * Submits a Logical plan for direct execution (bypasses parsing)
+ *
+ * @param plan the plan to execute
+ * @return a handle for the query result
+ * @throws RpcException
+ */
+ public DrillRpcFuture<QueryHandle> submitPlan(String plan) throws RpcException {
+ return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).setPlan(plan).build(), EMPTY_BUFFER);
+ }
+
+ /**
+ * Submits a Query for parsing and execution
+ *
+ * @param query the query to execute
+ * @return a handle for the query result
+ * @throws RpcException
+ */
+ public DrillRpcFuture<QueryHandle> submitQuery(String query) throws RpcException {
+ return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).build(), copiedBuffer(query, forName("UTF-8")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index 9c7eab2..d7ea8fa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -17,11 +17,11 @@
******************************************************************************/
package org.apache.drill.exec.coord;
-import java.io.Closeable;
-import java.util.List;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import java.io.Closeable;
+import java.util.Collection;
+
/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
* as well as understand other node's existence and capabilities.
@@ -36,10 +36,12 @@ public abstract class ClusterCoordinator implements Closeable {
public abstract void unregister(RegistrationHandle handle);
/**
- * Get a list of avialable Drillbit endpoints. Thread-safe. Could be slightly out of date depending on refresh policy.
- * @return A list of available endpoints.
+ * Get a collection of avialable Drillbit endpoints, Thread-safe.
+ * Could be slightly out of date depending on refresh policy.
+ *
+ * @return A collection of available endpoints.
*/
- public abstract List<DrillbitEndpoint> getAvailableEndpoints();
+ public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();
public interface RegistrationHandle {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
index b3cd27f..3ad08e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,17 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.coord;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Throwables;
+import com.google.common.base.Function;
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
@@ -38,8 +28,20 @@ import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.details.ServiceCache;
import com.netflix.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
-/** Manages cluster coordination utilizing zookeeper. **/
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.collect.Collections2.transform;
+
+/**
+ * Manages cluster coordination utilizing zookeeper. *
+ */
public class ZKClusterCoordinator extends ClusterCoordinator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKClusterCoordinator.class);
@@ -47,24 +49,26 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private CuratorFramework curator;
private ServiceDiscovery<DrillbitEndpoint> discovery;
private ServiceCache<DrillbitEndpoint> serviceCache;
- private volatile List<DrillbitEndpoint> endpoints = Collections.emptyList();
+ private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
private final String serviceName;
+
public ZKClusterCoordinator(DrillConfig config) throws IOException {
-
+
this.basePath = config.getString(ExecConstants.ZK_ROOT);
- this.serviceName = config.getString(ExecConstants.SERVICE_NAME);
-
+ this.serviceName = config.getString(ExecConstants.SERVICE_NAME);
RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
- config.getInt(ExecConstants.ZK_RETRY_DELAY));
-
+ config.getInt(ExecConstants.ZK_RETRY_DELAY));
curator = CuratorFrameworkFactory.builder()
- .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
- .retryPolicy(rp)
- .connectString(config.getString(ExecConstants.ZK_CONNECTION))
- .build();
-
+ .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+ .retryPolicy(rp)
+ .connectString(config.getString(ExecConstants.ZK_CONNECTION))
+ .build();
discovery = getDiscovery();
- serviceCache = discovery.serviceCacheBuilder().name(serviceName).refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH)).build();
+ serviceCache = discovery.
+ serviceCacheBuilder()
+ .name(serviceName)
+ .refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH))
+ .build();
}
public void start() throws Exception {
@@ -73,10 +77,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
discovery.start();
serviceCache.start();
serviceCache.addListener(new ZKListener());
+ updateEndpoints();
}
-
- private class ZKListener implements ServiceCacheListener{
-
+
+ private class ZKListener implements ServiceCacheListener {
+
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
@@ -84,62 +89,78 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
@Override
public void cacheChanged() {
logger.debug("Cache changed, updating.");
- try {
- Collection<ServiceInstance<DrillbitEndpoint>> instances = discovery.queryForInstances(serviceName);
- List<DrillbitEndpoint> newEndpoints = new ArrayList<DrillbitEndpoint>(instances.size());
- for(ServiceInstance<DrillbitEndpoint> si : instances){
- newEndpoints.add(si.getPayload());
- }
- endpoints = newEndpoints;
- } catch (Exception e) {
- logger.error("Failure while update Drillbit service location cache.", e);
- }
+ updateEndpoints();
}
}
- public void close() throws IOException{
+ public void close() throws IOException {
serviceCache.close();
discovery.close();
curator.close();
}
-
+
@Override
public RegistrationHandle register(DrillbitEndpoint data) {
try {
- ServiceInstance<DrillbitEndpoint> si = getSI(data);
- discovery.registerService(si);
- return new ZKRegistrationHandle(si.getId());
+ ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data);
+ discovery.registerService(serviceInstance);
+ return new ZKRegistrationHandle(serviceInstance.getId());
} catch (Exception e) {
- Throwables.propagate(e);
- return null;
+ throw propagate(e);
}
}
@Override
public void unregister(RegistrationHandle handle) {
- if( !( handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type");
-
+ if (!(handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type");
+
ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
try {
- ServiceInstance<DrillbitEndpoint> si = ServiceInstance.<DrillbitEndpoint>builder().address("").port(0).id(h.id).name(ExecConstants.SERVICE_NAME).build();
- discovery.unregisterService(si);
+ ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
+ .address("")
+ .port(0)
+ .id(h.id)
+ .name(serviceName)
+ .build();
+ discovery.unregisterService(serviceInstance);
} catch (Exception e) {
- Throwables.propagate(e);
+ propagate(e);
}
}
@Override
- public List<DrillbitEndpoint> getAvailableEndpoints() {
+ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return this.endpoints;
}
-
- private ServiceInstance<DrillbitEndpoint> getSI(DrillbitEndpoint ep) throws Exception{
- return ServiceInstance.<DrillbitEndpoint>builder().name(ExecConstants.SERVICE_NAME).payload(ep).build();
+
+ private void updateEndpoints() {
+ try {
+ endpoints = transform(discovery.queryForInstances(serviceName),
+ new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
+ @Override
+ public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
+ return input.getPayload();
+ }
+ });
+ } catch (Exception e) {
+ logger.error("Failure while update Drillbit service location cache.", e);
+ }
}
-
-
+
+ private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception {
+ return ServiceInstance.<DrillbitEndpoint>builder()
+ .name(serviceName)
+ .payload(endpoint)
+ .build();
+ }
+
public ServiceDiscovery<DrillbitEndpoint> getDiscovery() {
- return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath(basePath).client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
+ return ServiceDiscoveryBuilder
+ .builder(DrillbitEndpoint.class)
+ .basePath(basePath)
+ .client(curator)
+ .serializer(DrillServiceInstanceHelper.SERIALIZER)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 99ebe85..2961fae 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -32,20 +32,24 @@ import org.apache.drill.exec.service.ServiceEngine;
import java.net.InetAddress;
+/**
+ * Starts, tracks and stops all the required services for a Drillbit daemon to work.
+ */
public class Drillbit {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
- Drillbit bit = null;
+ return start(DrillConfig.create(options.getConfigLocation()));
+ }
+
+ public static Drillbit start(DrillConfig config) throws DrillbitStartupException {
+ Drillbit bit;
try {
logger.debug("Setting up Drillbit.");
- DrillConfig config = DrillConfig.create(options.getConfigLocation());
bit = new Drillbit(config);
} catch (Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}
-
-
try {
logger.debug("Starting Drillbit.");
bit.run();
@@ -80,9 +84,11 @@ public class Drillbit {
public void run() throws Exception {
coord.start();
engine.start();
-
- DrillbitEndpoint md = DrillbitEndpoint.newBuilder().setAddress(InetAddress.getLocalHost().getHostAddress())
- .setBitPort(engine.getBitPort()).setUserPort(engine.getUserPort()).build();
+ DrillbitEndpoint md = DrillbitEndpoint.newBuilder()
+ .setAddress(InetAddress.getLocalHost().getHostAddress())
+ .setBitPort(engine.getBitPort())
+ .setUserPort(engine.getUserPort())
+ .build();
handle = coord.register(md);
cache.run(md);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 94c8207..e3a24d2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.server;
import io.netty.channel.nio.NioEventLoopGroup;
+import java.util.Collection;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
@@ -45,7 +46,7 @@ public class DrillbitContext {
return config;
}
- public List<DrillbitEndpoint> getBits(){
+ public Collection<DrillbitEndpoint> getBits(){
return underlyingBit.coord.getAvailableEndpoints();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index ad18d6e..c516dda 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -4,6 +4,7 @@
drill.exec: {
cluster-id: "drillbits1"
rpc: {
+ user.address : localhost
user.port : 31010,
bit.port : 31011
},
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
index 7f5264c..645c4d5 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
@@ -18,27 +18,66 @@
package org.apache.drill.exec;
import com.google.common.collect.ImmutableList;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.StartupOptions;
+import org.apache.drill.exec.util.MiniZooKeeperCluster;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import static com.google.common.base.Throwables.propagate;
/**
* Base class for Drill system tests.
- * Starts one or more Drillbits and provides a configured client for testing.
+ * Starts one or more Drillbits, an embedded ZooKeeper cluster and provides a configured client for testing.
*/
public class DrillSystemTestBase {
- private static List<Drillbit> servers;
+ static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
- public void startCluster(StartupOptions options, int numServers) {
+ private static File testDir = new File("target/test-data");
+ private static DrillConfig config;
+ private static String zkUrl;
+ private static int bitPort;
+ private static int userPort;
+
+ private List<Drillbit> servers;
+ private MiniZooKeeperCluster zkCluster;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ config = DrillConfig.create();
+ bitPort = config.getInt(ExecConstants.INITIAL_BIT_PORT);
+ userPort = config.getInt(ExecConstants.INITIAL_USER_PORT);
+ zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
+ setupTestDir();
+ }
+
+ private static void setupTestDir() {
+ if (!testDir.exists()) {
+ testDir.mkdirs();
+ }
+ }
+
+ private DrillConfig newConfigWithDifferentPorts() {
+ return new DrillConfig(config
+ .withValue(ExecConstants.INITIAL_BIT_PORT, ConfigValueFactory.fromAnyRef(bitPort++))
+ .withValue(ExecConstants.INITIAL_USER_PORT, ConfigValueFactory.fromAnyRef(userPort++)));
+ }
+
+ public void startCluster(int numServers) {
try {
ImmutableList.Builder<Drillbit> servers = ImmutableList.builder();
for (int i = 0; i < numServers; i++) {
- servers.add(Drillbit.start(options));
+ DrillConfig config = newConfigWithDifferentPorts();
+// System.out.println("NEW CONFIG");
+// System.out.println(config);
+ servers.add(Drillbit.start(config));
}
this.servers = servers.build();
} catch (DrillbitStartupException e) {
@@ -46,16 +85,36 @@ public class DrillSystemTestBase {
}
}
- public void startZookeeper() {
-
+ public void startZookeeper(int numServers) {
+ try {
+ this.zkCluster = new MiniZooKeeperCluster();
+ zkCluster.setDefaultClientPort(Integer.parseInt(this.zkUrl.split(":")[1]));
+ zkCluster.startup(testDir, numServers);
+ } catch (IOException e) {
+ propagate(e);
+ } catch (InterruptedException e) {
+ propagate(e);
+ }
}
public void stopCluster() {
-
+ if (servers != null) {
+ for (Drillbit server : servers) {
+ try {
+ server.close();
+ } catch (Exception e) {
+ logger.warn("Error shutting down Drillbit", e);
+ }
+ }
+ }
}
public void stopZookeeper() {
-
+ try {
+ this.zkCluster.shutdown();
+ } catch (IOException e) {
+ propagate(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index a8ac41e..09a06d7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -1,7 +1,12 @@
package org.apache.drill.exec.client;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
import org.apache.drill.exec.DrillSystemTestBase;
-import org.apache.drill.exec.server.StartupOptions;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.junit.After;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -9,12 +14,39 @@ import org.junit.Test;
*/
public class DrillClientSystemTest extends DrillSystemTestBase {
- StartupOptions options = new StartupOptions();
+ private static String plan;
- @Test
- public void testSubmitQuery() {
- startCluster(options, 1);
+ @BeforeClass
+ public static void setUp() throws Exception {
+ DrillSystemTestBase.setUp();
+ plan = Resources.toString(Resources.getResource("simple_plan.json"), Charsets.UTF_8);
+ }
+ @After
+ public void tearDown() {
+ stopCluster();
+ stopZookeeper();
+ }
+ @Test
+ public void testSubmitPlanSingleNode() throws Exception {
+ startZookeeper(1);
+ startCluster(1);
+ DrillClient client = new DrillClient();
+ client.connect();
+ DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
+ System.out.println(result.get());
+ client.close();
+ }
+
+ @Test
+ public void testSubmitPlanTwoNodes() throws Exception {
+ startZookeeper(1);
+ startCluster(2);
+ DrillClient client = new DrillClient();
+ client.connect();
+ DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
+ System.out.println(result.get());
+ client.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..05011ac
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+import java.io.*;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Mostly Copied from HBase's MiniZooKeeperCluster, but without the Hadoop dependency.
+ */
+public class MiniZooKeeperCluster {
+ private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+
+ private boolean started;
+
+ /**
+ * The default port. If zero, we use a random port.
+ */
+ private int defaultClientPort = 0;
+
+ private int clientPort;
+
+ private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private List<ZooKeeperServer> zooKeeperServers;
+ private List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
+ private int tickTime = 0;
+
+ public MiniZooKeeperCluster() {
+ this.started = false;
+// this.configuration = configuration;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<ZooKeeperServer>();
+ clientPortList = new ArrayList<Integer>();
+ standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
+ }
+
+ public void setDefaultClientPort(int clientPort) {
+ if (clientPort <= 0) {
+ throw new IllegalArgumentException("Invalid default ZK client port: "
+ + clientPort);
+ }
+ this.defaultClientPort = clientPort;
+ }
+
+ /**
+ * Selects a ZK client port. Returns the default port if specified.
+ * Otherwise, returns a random port. The random port is selected from the
+ * range between 49152 to 65535. These ports cannot be registered with IANA
+ * and are intended for dynamic allocation (see http://bit.ly/dynports).
+ */
+ private int selectClientPort() {
+ if (defaultClientPort > 0) {
+ return defaultClientPort;
+ }
+ return 0xc000 + new Random().nextInt(0x3f00);
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size() - 1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ public int startup(File baseDir) throws IOException, InterruptedException {
+ return startup(baseDir, 1);
+ }
+
+ /**
+ * @param baseDir
+ * @param numZooKeeperServers
+ * @return ClientPort server bound to.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+ InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
+
+ setupTestEnv();
+ shutdown();
+
+ int tentativePort = selectClientPort();
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+ recreateDir(dir);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ NIOServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory = new NIOServerCnxnFactory();
+ standaloneServerFactory.configure(
+ new InetSocketAddress(tentativePort), 1000);
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " +
+ tentativePort);
+ // This port is already in use, try to use another.
+ tentativePort++;
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ // We have selected this port as a client port.
+ clientPortList.add(tentativePort);
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
+ }
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
+ started = true;
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+ "on client port: " + clientPort);
+ return clientPort;
+ }
+
+ private void recreateDir(File dir) throws IOException {
+ if (dir.exists()) {
+ FileUtil.fullyDelete(dir);
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void shutdown() throws IOException {
+ if (!started) {
+ return;
+ }
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+
+ // clear everything
+ started = false;
+ activeZKServerIndex = 0;
+ standaloneServerFactoryList.clear();
+ clientPortList.clear();
+ zooKeeperServers.clear();
+
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+
+ /**
+ * @return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.size() == 0) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex + 1;
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ sock.close();
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
new file mode 100644
index 0000000..7c97f66
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
@@ -0,0 +1,28 @@
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.exec: {
+ cluster-id: "drillbits1"
+ rpc: {
+ user.port : 31010,
+ bit.port : 32010
+ },
+ optimizer: {
+ implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+ },
+
+ zk: {
+ connect: "localhost:2181",
+ root: "/drill",
+ refresh: 500,
+ timeout: 1000,
+ retry: {
+ count: 7200,
+ delay: 500
+ }
+ }
+
+ network: {
+ start: 35000
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
new file mode 100644
index 0000000..2457b1f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
@@ -0,0 +1,133 @@
+{
+ head:{
+ type:"apache_drill_logical_plan",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ logs: {
+ type:"text",
+ file: "local://logs/*.log",
+ compress:"gzip",
+ line-delimiter:"\n",
+ record-maker:{
+ type:"first-row",
+ delimiter:","
+ }
+ },
+ {
+ type:"mongo",
+ name:"users",
+ connection:"mongodb://blue:red@localhost/users"
+ },
+ {
+ type:"mysql",
+ name:"mysql",
+ connection:"jdbc:mysql://localhost/main"
+ }
+ ],
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"local-logs",
+ selection: {}
+ },
+ {
+ @id:"2",
+ input:"1",
+ memo:"transform1",
+ op:"transform",
+ transforms:[
+ {
+ ref:"userId",
+ expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+ },
+ {
+ ref:"session",
+ expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+ }
+ ]
+ },
+ {
+ @id:"3",
+ input:"2",
+ memo:"transform2",
+ op:"transform",
+ transforms:[
+ {
+ ref:"userId",
+ expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+ },
+ {
+ ref:"session",
+ expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+ }
+ ]
+ },
+ {
+ @id:"7",
+ input:"3",
+ op:"sequence",
+ do:[
+ {
+ op:"transform",
+ memo:"seq_transform",
+ transforms:[
+ {
+ ref:"happy",
+ expr:"regex_like('ep2', \"dink\")"
+ }
+ ]
+ }
+ ,
+ {
+ op:"transform",
+ memo:"last_transform",
+ transforms:[
+ {
+ ref:"abc",
+ expr:"123"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ @id:"10",
+ input:"3",
+ op:"transform",
+ memo:"t3",
+ transforms:[
+ {
+ ref:"happy",
+ expr:"regex_like('ep2', \"dink\")"
+ }
+ ]
+ },
+ {
+ @id:12,
+ op:"join",
+ type: "inner",
+ left:"7",
+ right:"10",
+ conditions: [{relationship:"==", left: "1", right: "1" }]
+ }
+ ,
+ {
+ input: 12,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+
+ ]
+}
\ No newline at end of file