You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:38 UTC
[17/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
index 85c573d..75dce2c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
@@ -23,16 +23,19 @@ import static com.google.common.collect.Collections2.transform;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Function;
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.framework.state.ConnectionStateListener;
import com.netflix.curator.retry.RetryNTimes;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -52,6 +55,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private ServiceCache<DrillbitEndpoint> serviceCache;
private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
private final String serviceName;
+ private final CountDownLatch initialConnection = new CountDownLatch(1);
public ZKClusterCoordinator(DrillConfig config) throws IOException {
@@ -64,6 +68,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
.retryPolicy(rp)
.connectString(config.getString(ExecConstants.ZK_CONNECTION))
.build();
+ curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
discovery = getDiscovery();
serviceCache = discovery.
serviceCacheBuilder()
@@ -72,15 +77,36 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
.build();
}
- public void start() throws Exception {
+ public void start(long millisToWait) throws Exception {
logger.debug("Starting ZKClusterCoordination.");
curator.start();
discovery.start();
serviceCache.start();
serviceCache.addListener(new ZKListener());
+
+ if(millisToWait != 0){
+ boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS);
+ if(!success) throw new IOException(String.format("Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.", millisToWait));
+ }else{
+ this.initialConnection.await();
+ }
+
+
updateEndpoints();
}
+
+ private class InitialConnectionListener implements ConnectionStateListener{
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if(newState == ConnectionState.CONNECTED){
+ ZKClusterCoordinator.this.initialConnection.countDown();
+ client.getConnectionStateListenable().removeListener(this);
+ }
+ }
+
+ }
+
private class ZKListener implements ServiceCacheListener {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
new file mode 100644
index 0000000..9c18e51
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class BitComException extends DrillException{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComException.class);
+
+ public BitComException() {
+ super();
+ }
+
+ public BitComException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public BitComException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BitComException(String message) {
+ super(message);
+ }
+
+ public BitComException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
deleted file mode 100644
index a4899bd..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.exception;
-
-import org.apache.drill.common.exceptions.DrillException;
-
-public class ExecutionSetupException extends DrillException{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionSetupException.class);
-
- public ExecutionSetupException() {
- super();
- }
-
- public ExecutionSetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public ExecutionSetupException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExecutionSetupException(String message) {
- super(message);
- }
-
- public ExecutionSetupException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
index c273463..dbd66b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.exception;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
public class FragmentSetupException extends ExecutionSetupException{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentSetupException.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
deleted file mode 100644
index 30e7a63..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public interface CancelableQuery {
- public boolean cancel(long queryid);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
deleted file mode 100644
index 4e4ec77..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public class ExecutionPlanner {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionPlanner.class);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
deleted file mode 100644
index f138171..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-
-public class Foreman extends Thread{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-
- public Foreman(){
-
- }
-
- public void doWork(QueryWorkUnit work){
- // generate fragment structure.
- // store fragments in distributed grid.
- // generate any codegen required and store in grid.
- // drop
- // do get on the result set you're looking for. Do the initial get on the result node you're looking for. This will return either data or a metadata record set
- }
-
- public boolean checkStatus(long queryId){
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
deleted file mode 100644
index bdf4a1e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-
-import com.google.common.base.Preconditions;
-
-public class QueryWorkUnit {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
-
- private PlanFragment rootFragment; // for local
- private List<PlanFragment> fragments;
-
- public QueryWorkUnit(PlanFragment rootFragment, List<PlanFragment> fragments) {
- super();
- Preconditions.checkNotNull(rootFragment);
- Preconditions.checkNotNull(fragments);
- this.rootFragment = rootFragment;
- this.fragments = fragments;
- }
-
- public PlanFragment getRootFragment() {
- return rootFragment;
- }
-
- public List<PlanFragment> getFragments() {
- return fragments;
- }
-
-
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
deleted file mode 100644
index 96d7d1e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public class ResourceRequest {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceRequest.class);
-
- public long memoryMin;
- public long memoryDesired;
-
-
- public static class ResourceAllocation {
- public long memory;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
deleted file mode 100644
index fee6172..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-
-public interface StatusProvider {
- public FragmentStatus getStatus();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 2b3f574..6bddab7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -19,11 +19,10 @@ package org.apache.drill.exec.memory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.Closeable;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.common.config.DrillConfig;
/**
* Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
@@ -48,7 +47,7 @@ public abstract class BufferAllocator implements Closeable{
@Override
public abstract void close();
- public static BufferAllocator getAllocator(DrillbitContext context){
+ public static BufferAllocator getAllocator(DrillConfig config){
// TODO: support alternative allocators (including a debugging allocator that records all allocation locations for each buffer).
return new DirectBufferAllocator();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
index 6b89c12..1f47041 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
@@ -22,34 +22,34 @@ import org.apache.drill.exec.server.DrillbitContext;
import com.yammer.metrics.Counter;
/**
- * Wraps a parent counter so that local in thread metrics can be collected while collecting for a global counter.
+ * Wraps a parent counter so that local in-thread metrics can be collected while collecting for a global counter. Note
+ * that this one writer, many reader safe.
*/
public class SingleThreadNestedCounter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleThreadNestedCounter.class);
-
+
private volatile long count;
private final Counter counter;
-
-
+
public SingleThreadNestedCounter(DrillbitContext context, String name) {
super();
this.counter = context.getMetrics().counter(name);
}
- public long inc(long n){
+ public long inc(long n) {
counter.inc(n);
- count+= n;
+ count += n;
return count;
}
-
- public long dec(long n){
+
+ public long dec(long n) {
counter.dec(n);
count -= n;
return count;
}
-
- public long get(){
+
+ public long get() {
return count;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
deleted file mode 100644
index f626cea..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.SelectionVector;
-
-public abstract class FilteringRecordBatchTransformer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
-
- final RecordBatch incoming;
- final SelectionVector selectionVector;
- final BatchSchema schema;
-
- public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector selectionVector) {
- super();
- this.incoming = incoming;
- this.selectionVector = selectionVector;
- this.schema = innerSetup();
- }
-
- public abstract BatchSchema innerSetup();
-
- /**
- * Applies the filter to the selection index. Ignores any values in the selection vector, instead creating a.
- * @return
- */
- public abstract int apply();
-
- /**
- * Applies the filter to the selection index. Utilizes the existing selection index and only evaluates on those records.
- * @return
- */
- public abstract int applyWithSelection();
-
- public BatchSchema getSchema() {
- return schema;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 0cf17e9..e64453c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.ops;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
-import org.apache.drill.exec.planner.FragmentRunnable;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.physical.impl.FilteringRecordBatchTransformer;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
import com.yammer.metrics.MetricRegistry;
import com.yammer.metrics.Timer;
@@ -34,51 +39,72 @@ import com.yammer.metrics.Timer;
public class FragmentContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
- private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentRunnable.class, "completionTimes");
- private final static String METRIC_BATCHES_COMPLETED = MetricRegistry.name(FragmentRunnable.class, "batchesCompleted");
- private final static String METRIC_RECORDS_COMPLETED = MetricRegistry.name(FragmentRunnable.class, "recordsCompleted");
- private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentRunnable.class, "dataProcessed");
+ private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentRunner.class, "completionTimes");
+ private final static String METRIC_BATCHES_COMPLETED = MetricRegistry.name(FragmentRunner.class, "batchesCompleted");
+ private final static String METRIC_RECORDS_COMPLETED = MetricRegistry.name(FragmentRunner.class, "recordsCompleted");
+ private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentRunner.class, "dataProcessed");
private final DrillbitContext context;
- private final PlanFragment fragment;
public final SingleThreadNestedCounter batchesCompleted;
public final SingleThreadNestedCounter recordsCompleted;
public final SingleThreadNestedCounter dataProcessed;
public final Timer fragmentTime;
+ private final FragmentHandle handle;
+ private final UserClientConnection connection;
+ private final IncomingBuffers buffers;
- public FragmentContext(DrillbitContext dbContext, PlanFragment fragment) {
+ public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
this.recordsCompleted = new SingleThreadNestedCounter(dbContext, METRIC_RECORDS_COMPLETED);
this.dataProcessed = new SingleThreadNestedCounter(dbContext, METRIC_DATA_PROCESSED);
this.context = dbContext;
- this.fragment = fragment;
+ this.connection = connection;
+ this.handle = handle;
+ this.buffers = buffers;
}
public void fail(Throwable cause) {
}
+
public DrillbitContext getDrillbitContext(){
return context;
}
-
- public PlanFragment getFragment() {
- return fragment;
+
+ public DrillbitEndpoint getIdentity(){
+ return context.getEndpoint();
}
+ public FragmentHandle getHandle() {
+ return handle;
+ }
+
public BufferAllocator getAllocator(){
// TODO: A local query allocator to ensure memory limits and accurately gauge memory usage.
return context.getAllocator();
}
-
public FilteringRecordBatchTransformer getFilteringExpression(LogicalExpression expr){
return null;
}
+ public void addMetricsToStatus(FragmentStatus.Builder stats){
+ stats.setBatchesCompleted(batchesCompleted.get());
+ stats.setDataProcessed(dataProcessed.get());
+ stats.setRecordsCompleted(recordsCompleted.get());
+ }
+ public UserClientConnection getConnection() {
+ return connection;
+ }
+
public BitCom getCommunicator(){
- return null;
+ return context.getBitCom();
+ }
+
+ public IncomingBuffers getBuffers(){
+ return buffers;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
deleted file mode 100644
index 3c75648..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-
-public class FragmentConverter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentConverter.class);
-
- public static FragmentRoot getFragment(FragmentContext context){
- PlanFragment m = context.getFragment();
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
deleted file mode 100644
index ddacb41..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-
-/**
- * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- * output nodes and storage nodes. They are there driving force behind the completion of a query.
- */
-public interface FragmentRoot {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRoot.class);
-
- /**
- * Do the next batch of work.
- * @return Whether or not additional batches of work are necessary.
- */
- public boolean next();
-
-
- public void setup() throws FragmentSetupException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
deleted file mode 100644
index 8d4e807..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-public class OperatorFactory {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorFactory.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
deleted file mode 100644
index 59abdc4..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public interface OutputMutator {
- public void removeField(int fieldId) throws SchemaChangeException;
- public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
- public void setNewSchema(BatchSchema schema) throws SchemaChangeException ;
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fe37e70..fd24deb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,33 +19,42 @@ package org.apache.drill.exec.ops;
import java.util.Collection;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.server.DrillbitContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
public class QueryContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
- private long queryId;
+ private QueryId queryId;
private DrillbitContext drillbitContext;
- public QueryContext(long queryId, DrillbitContext drllbitContext) {
+ public QueryContext(QueryId queryId, DrillbitContext drllbitContext) {
super();
this.queryId = queryId;
this.drillbitContext = drllbitContext;
}
- public long getQueryId() {
- return queryId;
+ public DrillbitEndpoint getCurrentEndpoint(){
+ return drillbitContext.getEndpoint();
}
- public ObjectMapper getMapper(){
- return drillbitContext.getConfig().getMapper();
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public DistributedCache getCache(){
+ return drillbitContext.getCache();
}
public Collection<DrillbitEndpoint> getActiveEndpoints(){
return drillbitContext.getBits();
}
+ public PhysicalPlanReader getPlanReader(){
+ return drillbitContext.getPlanReader();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
deleted file mode 100644
index b46804f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import java.util.Iterator;
-
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.ValueVector;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.carrotsearch.hppc.procedures.IntObjectProcedure;
-
-/**
- * Record batch used for a particular scan. Operators against one or more
- */
-public abstract class ScanBatch implements RecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
-
- private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
- private BatchSchema schema;
- private int recordCount;
- private boolean schemaChanged = true;
- private final FragmentContext context;
- private Iterator<RecordReader> readers;
- private RecordReader currentReader;
- private final BatchSchema expectedSchema;
- private final Mutator mutator = new Mutator();
-
- public ScanBatch(BatchSchema expectedSchema, Iterator<RecordReader> readers, FragmentContext context)
- throws ExecutionSetupException {
- this.expectedSchema = expectedSchema;
- this.context = context;
- this.readers = readers;
- if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
- this.currentReader = readers.next();
- this.currentReader.setup(expectedSchema, mutator);
- }
-
- private void schemaChanged() {
- schema = null;
- schemaChanged = true;
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
- public BatchSchema getSchema() {
- return schema;
- }
-
- @Override
- public int getRecordCount() {
- return recordCount;
- }
-
- @Override
- public void kill() {
- releaseAssets();
- }
-
- private void releaseAssets() {
- fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
- @Override
- public void apply(int key, ValueVector<?> value) {
- value.close();
- }
- });
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
- if (fields.containsKey(fieldId))
- throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
- ValueVector<?> vector = this.fields.lget();
- if (vector.getClass().isAssignableFrom(clazz)) {
- return (T) vector;
- } else {
- throw new InvalidValueAccessor(String.format(
- "You requested a field accessor of type %s for field id %d but the actual type was %s.",
- clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
- }
- }
-
- @Override
- public IterOutcome next() {
- while ((recordCount = currentReader.next()) == 0) {
- try {
- if (!readers.hasNext()) {
- currentReader.cleanup();
- releaseAssets();
- return IterOutcome.NONE;
- }
- currentReader.cleanup();
- currentReader = readers.next();
- currentReader.setup(expectedSchema, mutator);
- } catch (ExecutionSetupException e) {
- this.context.fail(e);
- releaseAssets();
- return IterOutcome.STOP;
- }
- }
-
- if (schemaChanged) {
- schemaChanged = false;
- return IterOutcome.OK_NEW_SCHEMA;
- } else {
- return IterOutcome.OK;
- }
- }
-
- private class Mutator implements OutputMutator {
-
- public void removeField(int fieldId) throws SchemaChangeException {
- schemaChanged();
- ValueVector<?> v = fields.remove(fieldId);
- if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
- v.close();
- }
-
- public void addField(int fieldId, ValueVector<?> vector) {
- schemaChanged();
- ValueVector<?> v = fields.put(fieldId, vector);
- if (v != null) v.close();
- }
-
- @Override
- public void setNewSchema(BatchSchema schema) {
- ScanBatch.this.schema = schema;
- ScanBatch.this.schemaChanged = true;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
deleted file mode 100644
index 0fc7a1f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-/**
- * Works on one incoming batch at a time. Creates one output batch for each input batch.
- */
-public class StreamingRecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingRecordBatch.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
deleted file mode 100644
index 07d7099..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class ExchangeRecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeRecordBatch.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
deleted file mode 100644
index 0e35932..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class RecordBatchSender {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSender.class);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
deleted file mode 100644
index 5bef612..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.filter;
-
-import org.apache.drill.exec.ops.FilteringRecordBatchTransformer;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.SelectionVector;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public abstract class FilterRecordBatch implements RecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
- private RecordBatch incoming;
- private SelectionVector selectionVector;
- private BatchSchema schema;
- private FilteringRecordBatchTransformer transformer;
- private int outstanding;
-
- public FilterRecordBatch(RecordBatch batch) {
- this.incoming = batch;
- }
-
- @Override
- public FragmentContext getContext() {
- return incoming.getContext();
- }
-
- @Override
- public BatchSchema getSchema() {
- return schema;
- }
-
- @Override
- public int getRecordCount() {
- return 0;
- }
-
- @Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
- public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
- return null;
- }
-
- abstract int applyFilter(SelectionVector vector, int count);
-
- /**
- * Release all assets.
- */
- private void close() {
-
- }
-
- @Override
- public IterOutcome next() {
- while (true) {
- IterOutcome o = incoming.next();
- switch (o) {
- case OK_NEW_SCHEMA:
- transformer = incoming.getContext().getFilteringExpression(null);
- schema = transformer.getSchema();
- // fall through to ok.
- case OK:
-
- case NONE:
- case STOP:
- close();
- return IterOutcome.STOP;
- }
-
- if (outstanding > 0) {
- // move data to output location.
-
- for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
-
- }
- }
-
- // make sure the bit vector is as large as the current record batch.
- if (selectionVector.size() < incoming.getRecordCount()) {
- selectionVector.allocateNew(incoming.getRecordCount());
- }
-
- return null;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
deleted file mode 100644
index 218a19a..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.filter;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.DirectBufferAllocator;
-import org.apache.drill.exec.record.vector.NullableInt32Vector;
-import org.apache.drill.exec.record.vector.UInt16Vector;
-import org.codehaus.janino.ExpressionEvaluator;
-
-public class SelectionVectorUpdater {
- //static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorUpdater.class);
-
- // Add a selection vector to a record batch.
- /**
- * where a + b < 10
- */
-
- public static int applyToBatch(final int recordCount, final NullableInt32Vector a, final NullableInt32Vector b,
- final UInt16Vector selectionVector) {
- int selectionIndex = 0;
- for (int i = 0; i < recordCount; i++) {
- int isNotNull = a.isNull(i) * b.isNull(i);
- if (isNotNull > 0 && a.get(i) + b.get(i) < 10) {
- selectionVector.set(selectionIndex, (char) i);
- selectionIndex++;
- }
- }
- return selectionIndex;
- }
-
- public static void mai2n(String[] args) {
- int size = 1024;
- BufferAllocator allocator = new DirectBufferAllocator();
- NullableInt32Vector a = new NullableInt32Vector(0, allocator);
- NullableInt32Vector b = new NullableInt32Vector(1, allocator);
- UInt16Vector select = new UInt16Vector(2, allocator);
- a.allocateNew(size);
- b.allocateNew(size);
- select.allocateNew(size);
- int r = 0;
- for (int i = 0; i < 1500; i++) {
- r += applyToBatch(size, a, b, select);
- }
-
- System.out.println(r);
- }
-
-public static void main(String[] args) throws Exception{
- ExpressionEvaluator ee = new ExpressionEvaluator(
- "c > d ? c : d", // expression
- int.class, // expressionType
- new String[] { "c", "d" }, // parameterNames
- new Class[] { int.class, int.class } // parameterTypes
- );
-
- Integer res = (Integer) ee.evaluate(
- new Object[] { // parameterValues
- new Integer(10),
- new Integer(11),
- }
- );
- System.out.println("res = " + res);
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
index 70a42be..d2aaca3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
@@ -19,8 +19,7 @@ package org.apache.drill.exec.opt;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.optimize.Optimizer;
-import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.PhysicalPlan;
public class IdentityOptimizer extends Optimizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
new file mode 100644
index 0000000..9f506c1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.opt;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+public abstract class Optimizer implements Closeable{
+
+ public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
+
+ public abstract void init(DrillConfig config);
+
+ public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan);
+ public abstract void close();
+
+ public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
+ Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+ o.init(config);
+ return o;
+ }
+
+ public interface OptimizationContext{
+ public int getPriority();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
new file mode 100644
index 0000000..334119d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+public enum DataValidationMode {
+ TERMINATE, // terminate the query if the data doesn't match expected.
+ DROP_RECORD, // drop the record that doesn't match the expected situation.
+ SINK_RECORD // record the failed record along with the rule violation in a secondary location.
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
new file mode 100644
index 0000000..d7b21db
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+
+public class EndpointAffinity implements Comparable<EndpointAffinity>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
+
+ private DrillbitEndpoint endpoint;
+ private float affinity = 0.0f;
+
+ public EndpointAffinity(DrillbitEndpoint endpoint) {
+ super();
+ this.endpoint = endpoint;
+ }
+
+ public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+ super();
+ this.endpoint = endpoint;
+ this.affinity = affinity;
+ }
+
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+ public void setEndpoint(DrillbitEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+ public float getAffinity() {
+ return affinity;
+ }
+
+ @Override
+ public int compareTo(EndpointAffinity o) {
+ return Float.compare(affinity, o.affinity);
+ }
+
+ public void addAffinity(float f){
+ affinity += f;
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
new file mode 100644
index 0000000..ebe6446
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OperatorCost {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCost.class);
+
+ private final float network;
+ private final float disk;
+ private final float memory;
+ private final float cpu;
+
+
+
+ @JsonCreator
+ public OperatorCost(@JsonProperty("network") float network, @JsonProperty("disk") float disk, @JsonProperty("memory") float memory, @JsonProperty("cpu") float cpu) {
+ super();
+ this.network = network;
+ this.disk = disk;
+ this.memory = memory;
+ this.cpu = cpu;
+ }
+
+ public float getNetwork() {
+ return network;
+ }
+
+ public float getDisk() {
+ return disk;
+ }
+
+ public float getMemory() {
+ return memory;
+ }
+
+ public float getCpu() {
+ return cpu;
+ }
+
+ public static OperatorCost combine(OperatorCost c1, OperatorCost c2){
+ return new OperatorCost(c1.network + c2.network, c1.disk + c2.disk, c1.memory + c2.memory, c1.cpu + c2.cpu);
+ }
+
+ public OperatorCost add(OperatorCost c2){
+ return combine(this, c2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
new file mode 100644
index 0000000..84bfc87
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -0,0 +1,94 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.PlanProperties;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.graph.Graph;
+import org.apache.drill.common.graph.GraphAlgos;
+import org.apache.drill.exec.physical.base.Leaf;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Root;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+@JsonPropertyOrder({ "head", "graph" })
+public class PhysicalPlan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class);
+
+ PlanProperties properties;
+
+ Graph<PhysicalOperator, Root, Leaf> graph;
+
+ @JsonCreator
+ public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
+ this.properties = properties;
+ this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
+ }
+
+ @JsonProperty("graph")
+ public List<PhysicalOperator> getSortedOperators(){
+ // reverse the list so that nested references are flattened rather than nested.
+ return getSortedOperators(true);
+ }
+
+ public List<PhysicalOperator> getSortedOperators(boolean reverse){
+ List<PhysicalOperator> list = GraphAlgos.TopoSorter.sort(graph);
+ if(reverse){
+ return Lists.reverse(list);
+ }else{
+ return list;
+ }
+
+ }
+
+
+ @JsonProperty("head")
+ public PlanProperties getProperties() {
+ return properties;
+ }
+
+ /** Parses a physical plan. */
+ public static PhysicalPlan parse(ObjectReader reader, String planString) {
+ try {
+ PhysicalPlan plan = reader.readValue(planString);
+ return plan;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Converts a physical plan to a string. (Opposite of {@link #parse}.) */
+ public String unparse(ObjectWriter writer) {
+ try {
+ return writer.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
new file mode 100644
index 0000000..02fe025
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Describes a chunk of read work that will be done.
+ */
+public interface ReadEntry {
+ @JsonIgnore
+ public OperatorCost getCost();
+
+ @JsonIgnore
+ public Size getSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
new file mode 100644
index 0000000..db3390a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.common.expression.types.DataType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RecordField {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordField.class);
+
+
+ private DataType type;
+ private ValueMode mode;
+
+ @JsonCreator
+ public RecordField(@JsonProperty("type") DataType type, @JsonProperty("mode") ValueMode mode) {
+ super();
+ this.type = type;
+ this.mode = mode;
+ }
+
+ public DataType getType() {
+ return type;
+ }
+
+ public ValueMode getMode() {
+ return mode;
+ }
+
+ public static enum ValueMode {
+ VECTOR,
+ DICT,
+ RLE
+ }
+
+ public static enum ValueType {
+ OPTIONAL,
+ REQUIRED,
+ REPEATED
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
new file mode 100644
index 0000000..96bd996
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+public interface WriteEntry {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriteEntry.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
new file mode 100644
index 0000000..e91257e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.physical.OperatorCost;
+
+public abstract class AbstractBase implements PhysicalOperator{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
+
+
+
+ @Override
+ public void accept(GraphVisitor<PhysicalOperator> visitor) {
+ visitor.enter(this);
+ if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this);
+ for(PhysicalOperator o : this){
+ o.accept(visitor);
+ }
+ visitor.leave(this);
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return true;
+ }
+
+}