You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:39 UTC
[18/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
deleted file mode 100644
index 58edf03..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.EndpointAffinity;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-
-
-public abstract class AbstractStore extends AbstractSingle implements Store{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
-
- public AbstractStore(PhysicalOperator child) {
- super(child);
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
- return physicalVisitor.visitStore(this, value);
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
deleted file mode 100644
index d779eb8..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.OperatorCost;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Exchange extends PhysicalOperator{
-
- @JsonIgnore
- public abstract OperatorCost getAggregateSendCost();
-
- @JsonIgnore
- public abstract OperatorCost getAggregateReceiveCost();
-
- @JsonProperty("cost")
- public abstract ExchangeCost getExchangeCost();
-
- /**
- * Inform this Exchange node about its sender locations.
- * @param senderLocations
- */
- public abstract void setupSenders(List<DrillbitEndpoint> senderLocations);
-
- /**
- * Inform this Exchange node about its receiver locations.
- * @param receiverLocations
- */
- public abstract void setupReceivers(List<DrillbitEndpoint> receiverLocations);
-
- /**
- * Get the Sender associated with the given minorFragmentId.
- * @param minorFragmentId The minor fragment id, must be in the range [0, fragment.width).
- * @param child The feeding node for the requested sender.
- * @return The materialized sender for the given arguments.
- */
- public abstract Sender getSender(int minorFragmentId, PhysicalOperator child);
-
- /**
- * Get the Receiver associated with the given minorFragmentId.
- * @param minorFragmentId The minor fragment id, must be in the range [0, fragment.width).
- * @return The materialized recevier for the given arguments.
- */
- public abstract Receiver getReceiver(int minorFragmentId);
-
- public abstract int getMaxSendWidth();
-
- public PhysicalOperator getChild();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
deleted file mode 100644
index f17203e..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.physical.OperatorCost;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class ExchangeCost {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeCost.class);
-
- private final OperatorCost send;
- private final OperatorCost receive;
- private final OperatorCost combined;
-
- @JsonCreator
- public ExchangeCost(@JsonProperty("send") OperatorCost send, @JsonProperty("receive") OperatorCost receive) {
- this.send = send;
- this.receive = receive;
- this.combined = OperatorCost.combine(send, receive);
- }
-
- @JsonIgnore
- public OperatorCost getCombinedCost(){
- return combined;
- }
-
- public OperatorCost getSend() {
- return send;
- }
-
- public OperatorCost getReceive() {
- return receive;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
deleted file mode 100644
index 4557df4..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-
-/**
- * A POP which relies on no other nodes within the current fragment.
- */
-public interface FragmentLeaf extends PhysicalOperator{
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
deleted file mode 100644
index 8d87d56..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-
-/**
- * Describes the root operation within a particular Fragment. This includes things like Sinks, and Sender nodes.
- */
-public interface FragmentRoot extends PhysicalOperator{
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
deleted file mode 100644
index feb32ec..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.EndpointAffinity;
-
-public interface HasAffinity extends PhysicalOperator{
- public List<EndpointAffinity> getOperatorAffinity();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
deleted file mode 100644
index 28efb94..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-public interface Leaf extends PhysicalOperator{
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
deleted file mode 100644
index d8d1b64..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.graph.GraphValue;
-import org.apache.drill.common.physical.OperatorCost;
-
-import com.fasterxml.jackson.annotation.JsonIdentityInfo;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.ObjectIdGenerators;
-
-@JsonInclude(Include.NON_NULL)
-@JsonPropertyOrder({ "@id" })
-@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
-
- public OperatorCost getCost();
-
- /**
- * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be
- * executed. However, Exchange nodes cannot be executed. In order to be executed, they must be converted into their
- * Exec sub components.
- *
- * @return
- */
- @JsonIgnore
- public boolean isExecutable();
-
- /**
- * Provides capability to build a set of output based on traversing a query graph tree.
- * @param physicalVisitor
- * @return
- */
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
deleted file mode 100644
index fb1fdcd..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.config.CommonConstants;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.PathScanner;
-
-public class PhysicalOperatorUtil {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class);
-
- private PhysicalOperatorUtil(){}
-
- public synchronized static Class<?>[] getSubTypes(DrillConfig config){
- Class<?>[] ops = PathScanner.scanForImplementationsArr(PhysicalOperator.class, config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
- logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) );
- return ops;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
deleted file mode 100644
index 2ecc6ce..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.physical.pop.Filter;
-import org.apache.drill.common.physical.pop.PartitionToRandomExchange;
-import org.apache.drill.common.physical.pop.Project;
-import org.apache.drill.common.physical.pop.Sort;
-
-public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalVisitor.class);
-
-
- public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
- public RETURN visitScan(Scan<?> scan, EXTRA value) throws EXCEP;
- public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
-
- public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
- public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
- public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
- public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
- public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
-
- public RETURN visitUnknown(PhysicalOperator op, EXTRA value) throws EXCEP;
-
- public RETURN visitPartitionToRandomExchange(PartitionToRandomExchange partitionToRandom, EXTRA value) throws EXCEP;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
deleted file mode 100644
index db8f71f..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-public interface Receiver extends FragmentLeaf {
- public abstract List<DrillbitEndpoint> getProvidingEndpoints();
-
- /**
- * Whether or not this receive supports out of order exchange. This provides a hint for the scheduling node on whether
- * the receiver can start work if only a subset of all sending endpoints are currently providing data. A random
- * receiver would supports this form of operation. A NWAY receiver would not.
- *
- * @return True if this receiver supports working on a streaming/out of order input.
- */
- public abstract boolean supportsOutOfOrderExchange();
-
-
- public int getSenderCount();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
deleted file mode 100644
index c4f9982..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-/**
- * Marker interface describe the root of a query plan.
- */
-public interface Root extends PhysicalOperator{
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
deleted file mode 100644
index c7b45a8..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.ReadEntry;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Scan<R extends ReadEntry> extends Leaf, HasAffinity{
-
- @JsonProperty("entries")
- public abstract List<R> getReadEntries();
-
- public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
-
- public abstract Scan<?> getSpecificScan(int minorFragmentId);
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
deleted file mode 100644
index 1859657..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-
-
-
-public interface Sender extends FragmentRoot{
- public abstract List<DrillbitEndpoint> getDestinations();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
deleted file mode 100644
index eec4a6c..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-public interface Store extends Root, HasAffinity{
-
- public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
- public abstract Store getSpecificStore(PhysicalOperator child, int minorFragmentId);
- public abstract int getMaxWidth();
- public abstract PhysicalOperator getChild();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/protobuf/Coordination.proto b/sandbox/prototype/common/src/main/protobuf/Coordination.proto
deleted file mode 100644
index f98d2c5..0000000
--- a/sandbox/prototype/common/src/main/protobuf/Coordination.proto
+++ /dev/null
@@ -1,26 +0,0 @@
-package exec;
-
-option java_package = "org.apache.drill.common.proto";
-option java_outer_classname = "CoordinationProtos";
-option optimize_for = LITE_RUNTIME;
-
-message DrillbitEndpoint{
- optional string address = 1;
- optional int32 user_port = 2;
- optional int32 bit_port = 3;
- optional Roles roles = 4;
-}
-
-message DrillServiceInstance{
- optional string id = 1;
- optional int64 registrationTimeUTC = 2;
- optional DrillbitEndpoint endpoint = 3;
-}
-
-message Roles{
- optional bool sql_query = 1 [default = true];
- optional bool logical_plan = 2 [default = true];
- optional bool physical_plan = 3 [default = true];
- optional bool java_executor = 4 [default = true];
- optional bool distributed_cache = 5 [default = true];
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
deleted file mode 100644
index 1b042c5..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractScan;
-import org.apache.drill.common.physical.pop.base.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-scan")
-public class MockScanPOP extends AbstractScan<MockScanPOP.MockScanEntry>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanPOP.class);
-
- private final String url;
-
- @JsonCreator
- public MockScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
- super(readEntries);
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-
- public static class MockScanEntry implements ReadEntry{
- public int id;
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Scan<?> getSpecificScan(int minorFragmentId) {
- return this;
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
deleted file mode 100644
index f48c539..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractStore;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.Store;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-store")
-public class MockStorePOP extends AbstractStore {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
-
- @JsonCreator
- public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
- super(child);
- }
-
- public int getMaxWidth() {
- return 1;
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
- return this;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
deleted file mode 100644
index 0ad1f76..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.FileUtils;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-public class ParsePhysicalPlan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePhysicalPlan.class);
-
-
- @Test
- public void parseSimplePlan() throws Exception{
- DrillConfig c = DrillConfig.create();
- ObjectReader r = c.getMapper().reader(PhysicalPlan.class);
- ObjectWriter writer = c.getMapper().writer();
- PhysicalPlan plan = PhysicalPlan.parse(r, Files.toString(FileUtils.getResourceAsFile("/physical_test1.json"), Charsets.UTF_8));
- System.out.println(plan.unparse(writer));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/drill-module.conf b/sandbox/prototype/common/src/test/resources/drill-module.conf
index 86e828a..0e2c84e 100644
--- a/sandbox/prototype/common/src/test/resources/drill-module.conf
+++ b/sandbox/prototype/common/src/test/resources/drill-module.conf
@@ -1,2 +1 @@
drill.logical.storage.packages += "org.apache.drill.storage"
-drill.physical.operator.packages += "org.apache.drill.common.physical.pop"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/resources/physical_test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/physical_test1.json b/sandbox/prototype/common/src/test/resources/physical_test1.json
deleted file mode 100644
index 16bc87a..0000000
--- a/sandbox/prototype/common/src/test/resources/physical_test1.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_PHYSICAL",
- version:"1",
- generator:{
- type:"manual"
- }
- },
- graph:[
- {
- @id:1,
- pop:"mock-scan",
- url: "http://apache.org",
- entries:[
- {id:1}
- ],
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
- },
- {
- @id:2,
- child: 1,
- pop:"filter",
- expr: "b > 5",
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
- },
- {
- @id: 3,
- child: 2,
- pop: "mock-store",
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 9766df7..f5ece33 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -56,7 +56,7 @@
<groupId>org.apache.drill</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
- <classifier>test</classifier>
+ <classifier>tests</classifier>
</dependency>
<dependency>
<groupId>com.beust</groupId>
@@ -110,7 +110,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.0.0.CR1</version>
+ <version>4.0.0.CR2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
deleted file mode 100644
index 82a8a85..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec;
-
-import java.util.Arrays;
-
-import com.google.common.base.Charsets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.UnsignedBytes;
-
-public class ByteReorder {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteReorder.class);
-
- public static void main(String[] args){
- String[] strings = {"hello", "goodbye", "my friend"};
- byte[][] bytes = new byte[strings.length][];
- for(int i =0; i < strings.length; i++){
- bytes[i] = strings[i].getBytes(Charsets.UTF_8);
- }
-
- for(int i =0; i < bytes.length; i++){
- for(int v = 0; v < bytes[i].length; v++){
- bytes[i][v] = (byte) ~bytes[i][v];
- }
- }
-
- Arrays.sort(bytes, UnsignedBytes.lexicographicalComparator());
-
- for(int i =0; i < bytes.length; i++){
- for(int v = 0; v < bytes[i].length; v++){
- bytes[i][v] = (byte) ~bytes[i][v];
- }
- }
-
- for(int i =0; i < bytes.length; i++){
- System.out.println(new String(bytes[i]));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index 2928dbe..ba2c26b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -20,8 +20,9 @@ package org.apache.drill.exec.cache;
import java.io.Closeable;
import java.util.List;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
@@ -29,14 +30,11 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
public interface DistributedCache extends Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
- public void run(DrillbitEndpoint endpoint) throws DrillbitStartupException;
+ public void run() throws DrillbitStartupException;
- public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical);
- public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical);
+// public void updateLocalQueueLength(int length);
+// public List<WorkQueueStatus> getQueueLengths();
- public void updateLocalQueueLength(int length);
- public List<WorkQueueStatus> getQueueLengths();
-
- public PlanFragment getFragment(long fragmentId);
+ public PlanFragment getFragment(FragmentHandle handle);
public void storeFragment(PlanFragment fragment);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 943031d..f4fdbfa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -17,20 +17,22 @@
******************************************************************************/
package org.apache.drill.exec.cache;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
+import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.beust.jcommander.internal.Lists;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
@@ -38,39 +40,36 @@ import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.DataSerializable;
public class HazelCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
private final String instanceName;
private HazelcastInstance instance;
- private ITopic<WrappedWorkQueueStatus> workQueueLengths;
- private DrillbitEndpoint endpoint;
+ private ITopic<HWorkQueueStatus> workQueueLengths;
+ private HandlePlan fragments;
private Cache<WorkQueueStatus, Integer> endpoints;
- private IMap<TemplatizedLogicalPlan, TemplatizedPhysicalPlan> optimizedPlans;
public HazelCache(DrillConfig config) {
this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
}
- private class Listener implements MessageListener<WrappedWorkQueueStatus>{
+ private class Listener implements MessageListener<HWorkQueueStatus>{
@Override
- public void onMessage(Message<WrappedWorkQueueStatus> wrapped) {
+ public void onMessage(Message<HWorkQueueStatus> wrapped) {
logger.debug("Received new queue length message.");
- endpoints.put(wrapped.getMessageObject().status, 0);
+ endpoints.put(wrapped.getMessageObject().get(), 0);
}
}
- public void run(DrillbitEndpoint endpoint) {
+ public void run() {
Config c = new Config();
c.setInstanceName(instanceName);
instance = getInstanceOrCreateNew(c);
workQueueLengths = instance.getTopic("queue-length");
- optimizedPlans = instance.getMap("plan-optimizations");
- this.endpoint = endpoint;
+ fragments = new HandlePlan(instance);
endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
workQueueLengths.addMessageListener(new Listener());
}
@@ -83,52 +82,16 @@ public class HazelCache implements DistributedCache {
return Hazelcast.newHazelcastInstance(c);
}
- @Override
- public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) {
- optimizedPlans.put(logical, physical);
- }
-
- @Override
- public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical) {
- return optimizedPlans.get(logical);
- }
-
- @Override
- public void updateLocalQueueLength(int length) {
- workQueueLengths.publish(new WrappedWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
- .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
- }
-
- @Override
- public List<WorkQueueStatus> getQueueLengths() {
- return Lists.newArrayList(endpoints.asMap().keySet());
- }
-
- public class WrappedWorkQueueStatus implements DataSerializable {
-
- public WorkQueueStatus status;
-
- public WrappedWorkQueueStatus(WorkQueueStatus status) {
- this.status = status;
- }
-
- @Override
- public void readData(DataInput arg0) throws IOException {
- int len = arg0.readShort();
- byte[] b = new byte[len];
- arg0.readFully(b);
- this.status = WorkQueueStatus.parseFrom(b);
- }
-
- @Override
- public void writeData(DataOutput arg0) throws IOException {
- byte[] b = status.toByteArray();
- if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
- arg0.writeShort(b.length);
- arg0.write(b);
- }
-
- }
+// @Override
+// public void updateLocalQueueLength(int length) {
+// workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
+// .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
+// }
+//
+// @Override
+// public List<WorkQueueStatus> getQueueLengths() {
+// return Lists.newArrayList(endpoints.asMap().keySet());
+// }
@Override
public void close() throws IOException {
@@ -136,13 +99,13 @@ public class HazelCache implements DistributedCache {
}
@Override
- public PlanFragment getFragment(long fragmentId) {
- throw new UnsupportedOperationException();
+ public PlanFragment getFragment(FragmentHandle handle) {
+ return this.fragments.get(handle);
}
@Override
public void storeFragment(PlanFragment fragment) {
- throw new UnsupportedOperationException();
+ fragments.put(fragment.getHandle(), fragment);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
new file mode 100644
index 0000000..ddb2a02
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.google.common.collect.Maps;
+
+public class LocalCache implements DistributedCache {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
+
+ private volatile Map<FragmentHandle, PlanFragment> handles;
+
+ @Override
+ public void close() throws IOException {
+ handles = null;
+ }
+
+ @Override
+ public void run() throws DrillbitStartupException {
+ handles = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public PlanFragment getFragment(FragmentHandle handle) {
+ return handles.get(handle);
+ }
+
+ @Override
+ public void storeFragment(PlanFragment fragment) {
+ handles.put(fragment.getHandle(), fragment);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
new file mode 100644
index 0000000..46bb9ee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
+
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+
+public class ProtoBufImpl {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
+
+ public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
+ public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
+ public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
+ }
+
+ public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
+ public HFragmentHandle() {super(FragmentHandle.PARSER);}
+ public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
+ }
+
+ public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
+ public HPlanFragment() {super(PlanFragment.PARSER);}
+ public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
+ }
+
+ public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
+ public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
+ public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
+ public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
new file mode 100644
index 0000000..c3a9160
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.nio.DataSerializable;
+
+public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
+
+ T value;
+ final Parser<T> parser;
+
+ public ProtoBufWrap(Parser<T> parser){
+ this(null, parser);
+ }
+
+ public ProtoBufWrap(T value, Parser<T> parser){
+ this.value = value;
+ this.parser = parser;
+ }
+
+ @Override
+ public void readData(DataInput arg0) throws IOException {
+ int len = arg0.readShort();
+ byte[] b = new byte[len];
+ arg0.readFully(b);
+ this.value = parser.parseFrom(b);
+ }
+
+ @Override
+ public void writeData(DataOutput arg0) throws IOException {
+ byte[] b = value.toByteArray();
+ if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
+ arg0.writeShort(b.length);
+ arg0.write(b);
+ }
+
+ protected T get() {
+ return value;
+ }
+
+ protected void set(T value) {
+ this.value = value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
new file mode 100644
index 0000000..dac8201
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
+
+ private IMap<HK, HV> hzMap;
+
+ public ProtoMap(HazelcastInstance instance, String mapName){
+ hzMap = instance.getMap(mapName);
+ }
+
+ public V get(K key){
+ Preconditions.checkNotNull(key);
+ HK hk = getNewKey(key);
+ HV hv = hzMap.get(hk);
+ if(hv == null) return null;
+ return hv.get();
+ }
+
+ public V put(K key, V value){
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+ HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
+ return oldValue.get();
+ }
+
+ public abstract HK getNewKey(K key);
+ public abstract HV getNewValue(V key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
deleted file mode 100644
index 5ad9ef1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.cache;
-
-public class TemplatizedLogicalPlan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedLogicalPlan.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
deleted file mode 100644
index 643720c..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.cache;
-
-public class TemplatizedPhysicalPlan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedPhysicalPlan.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index ee63213..bb7f77e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -25,24 +25,34 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.rpc.user.UserRpcConfig;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
*/
-public class DrillClient {
-
+public class DrillClient implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
+
DrillConfig config;
private UserClient client;
private ClusterCoordinator clusterCoordinator;
@@ -56,8 +66,17 @@ public class DrillClient {
}
public DrillClient(DrillConfig config) {
+ this(config, null);
+ }
+
+ public DrillClient(DrillConfig config, ClusterCoordinator coordinator){
this.config = config;
+ this.clusterCoordinator = coordinator;
}
+
+
+
+
/**
* Connects the client to a Drillbit server
@@ -65,9 +84,11 @@ public class DrillClient {
* @throws IOException
*/
public void connect() throws Exception {
- this.clusterCoordinator = new ZKClusterCoordinator(this.config);
- this.clusterCoordinator.start();
- Thread.sleep(10000);
+ if(clusterCoordinator == null){
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+ this.clusterCoordinator.start(10000);
+ }
+
Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints();
checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
// just use the first endpoint for now
@@ -75,7 +96,8 @@ public class DrillClient {
ByteBufAllocator bb = new PooledByteBufAllocator(true);
this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
try {
- this.client.connectAsClient(endpoint.getAddress(), endpoint.getUserPort());
+ logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
+ this.client.connect(endpoint);
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -97,8 +119,37 @@ public class DrillClient {
* @return a handle for the query result
* @throws RpcException
*/
- public DrillRpcFuture<QueryHandle> submitPlan(String plan) throws RpcException {
- return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).setPlan(plan).build(), null);
+ public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
+ try {
+ ListHoldingResultsListener listener = new ListHoldingResultsListener();
+ Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
+ f.get();
+ if(listener.ex != null){
+ throw listener.ex;
+ }else{
+ return listener.results;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RpcException(e);
+ }
+ }
+
+ private class ListHoldingResultsListener extends UserResultsListener{
+ private RpcException ex;
+ private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+
+ @Override
+ public void submissionFailed(RpcException ex) {
+ logger.debug("Submission failed.", ex);
+ this.ex = ex;
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
+ results.add(result);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index d3580b5..7fb1f5b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.coord;
import java.io.Closeable;
import java.util.Collection;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
@@ -29,7 +29,12 @@ import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
public abstract class ClusterCoordinator implements Closeable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class);
- public abstract void start() throws Exception;
+ /**
+ * Start the cluster coordinator. Millis to wait is
+ * @param millisToWait The maximum time to wait before throwing an exception if the cluster coordination service has not successfully started. Use 0 to wait indefinitely.
+ * @throws Exception
+ */
+ public abstract void start(long millisToWait) throws Exception;
public abstract RegistrationHandle register(DrillbitEndpoint data);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
index ce0fb92..289aa3c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
@@ -17,9 +17,9 @@
******************************************************************************/
package org.apache.drill.exec.coord;
-import org.apache.drill.common.proto.CoordinationProtos.DrillServiceInstance;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillServiceInstance;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.ServiceInstanceBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
new file mode 100644
index 0000000..5886c2c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class DrillbitEndpointSerDe {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitEndpointSerDe.class);
+
+ public static class De extends StdDeserializer<DrillbitEndpoint> {
+
+ public De() {
+ super(DrillbitEndpoint.class);
+ }
+
+ @Override
+ public DrillbitEndpoint deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+ JsonProcessingException {
+ return DrillbitEndpoint.parseFrom(jp.getBinaryValue());
+ }
+
+
+ }
+
+
+ public static class Se extends StdSerializer<DrillbitEndpoint> {
+
+ public Se() {
+ super(DrillbitEndpoint.class);
+ }
+
+ @Override
+ public void serialize(DrillbitEndpoint value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeBinary(value.toByteArray());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
new file mode 100644
index 0000000..43a5430
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.collect.Maps;
+
+public class LocalClusterCoordinator extends ClusterCoordinator{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
+
+ private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+
+ @Override
+ public void close() throws IOException {
+ endpoints = null;
+ }
+
+ @Override
+ public void start(long millis) throws Exception {
+ logger.debug("Local Cluster Coordinator started.");
+ endpoints = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public RegistrationHandle register(DrillbitEndpoint data) {
+ logger.debug("Endpoint registered {}.", data);
+ Handle h = new Handle();
+ endpoints.put(h, data);
+ return h;
+ }
+
+ @Override
+ public void unregister(RegistrationHandle handle) {
+ endpoints.remove(handle);
+ }
+
+ @Override
+ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
+ return endpoints.values();
+ }
+
+
+ private class Handle implements RegistrationHandle{
+ UUID id = UUID.randomUUID();
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((id == null) ? 0 : id.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ Handle other = (Handle) obj;
+ if (!getOuterType().equals(other.getOuterType())) return false;
+ if (id == null) {
+ if (other.id != null) return false;
+ } else if (!id.equals(other.id)) return false;
+ return true;
+ }
+
+ private LocalClusterCoordinator getOuterType() {
+ return LocalClusterCoordinator.this;
+ }
+
+ }
+
+}