You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/16 20:00:46 UTC

[1/2] git commit: Remove extraneous files. Update RPC to use EnumLite and getNumber() instead of ordinal() to fix the situation where protobuf def enum definition skips numbers.

Updated Branches:
  refs/heads/execwork b53933f22 -> f1746c92f


Remove extraneous files.  Update RPC to use EnumLite and getNumber() instead of ordinal() to fix the situation where protobuf def enum definition skips numbers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f3b20193
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f3b20193
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f3b20193

Branch: refs/heads/execwork
Commit: f3b2019365dab6abed84810173593152c9fb3ffa
Parents: b53933f
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon Apr 15 14:42:51 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Apr 15 14:42:51 2013 -0700

----------------------------------------------------------------------
 .../prototype/exec/java-exec/rse/ClasspathRSE.java |   88 -------
 .../prototype/exec/java-exec/rse/ConsoleRSE.java   |   60 -----
 .../exec/java-exec/rse/FileSystemRSE.java          |  144 ------------
 .../exec/java-exec/rse/JSONDataWriter.java         |  142 -----------
 .../exec/java-exec/rse/JSONRecordReader.java       |  183 ---------------
 .../exec/java-exec/rse/OutputStreamWriter.java     |   78 ------
 sandbox/prototype/exec/java-exec/rse/QueueRSE.java |  100 --------
 sandbox/prototype/exec/java-exec/rse/RSEBase.java  |   71 ------
 .../prototype/exec/java-exec/rse/RSERegistry.java  |   85 -------
 .../prototype/exec/java-exec/rse/RecordReader.java |   28 ---
 .../exec/java-exec/rse/RecordRecorder.java         |   32 ---
 .../exec/java-exec/rse/ReferenceStorageEngine.java |   45 ----
 .../org/apache/drill/exec/rpc/BasicClient.java     |    4 +-
 .../org/apache/drill/exec/rpc/BasicServer.java     |    7 +-
 .../apache/drill/exec/rpc/CoordinationQueue.java   |   71 +++---
 .../org/apache/drill/exec/rpc/DrillRpcFuture.java  |    6 +-
 .../apache/drill/exec/rpc/OutboundRpcMessage.java  |    5 +-
 .../java/org/apache/drill/exec/rpc/Response.java   |    5 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |    3 +-
 19 files changed, 56 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java b/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
deleted file mode 100644
index aa8186d..0000000
--- a/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
-import org.apache.drill.exec.ref.rops.ROP;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-public class ClasspathRSE extends RSEBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClasspathRSE.class);
-
-  private DrillConfig dConfig;
-  private SchemaPath rootPath;
-  
-  public ClasspathRSE(ClasspathRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
-    this.dConfig = dConfig;
-  }
-
-  
-  @JsonTypeName("classpath")
-  public static class ClasspathRSEConfig extends StorageEngineConfigBase {
-  }
-  
-  public static class ClasspathInputConfig implements ReadEntry{
-    public String path;
-    public ConverterType type;
-    @JsonIgnore public SchemaPath rootPath; 
-  }
-
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
-    ClasspathInputConfig c = scan.getSelection().getWith(dConfig, ClasspathInputConfig.class);
-    c.rootPath = scan.getOutputReference();
-    return Collections.singleton((ReadEntry) c);
-  }
-
-  @Override
-  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
-    ClasspathInputConfig e = getReadEntry(ClasspathInputConfig.class, readEntry);
-    URL u = RecordReader.class.getResource(e.path);
-    if(u == null){
-      throw new IOException(String.format("Failure finding classpath resource %s.", e.path));
-    }
-    switch(e.type){
-    case JSON:
-      return new JSONRecordReader(e.rootPath, dConfig, u.openStream(), parentROP);
-    default:
-      throw new UnsupportedOperationException();
-    }
-  }
-  
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java b/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
deleted file mode 100644
index 1570ea9..0000000
--- a/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.OutputStream;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-public class ConsoleRSE extends RSEBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConsoleRSE.class);
-  
-  private final DrillConfig dConfig;
-  
-  public static enum Pipe {
-    STD_OUT, STD_ERR
-  };
-
-  public ConsoleRSE(ConsoleRSEConfig engineConfig, DrillConfig dConfig){
-    this.dConfig = dConfig;
-  }
-  
-  public static class ConsoleOutputConfig {
-    public Pipe pipe = Pipe.STD_OUT;
-    public ConverterType type = ConverterType.JSON;
-  }
-  
-  @JsonTypeName("console") public static class ConsoleRSEConfig extends StorageEngineConfigBase {}
-  
-  public boolean supportsWrite() {
-    return true;
-  }
-
-  @Override
-  public RecordRecorder getWriter(Store store) {
-    ConsoleOutputConfig config = store.getTarget().getWith(dConfig, ConsoleOutputConfig.class);
-    OutputStream out = config.pipe == Pipe.STD_OUT ? System.out : System.err;
-    return new OutputStreamWriter(out, config.type, false);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java b/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
deleted file mode 100644
index 522191b..0000000
--- a/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
-import org.apache.drill.exec.ref.rops.ROP;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-public class FileSystemRSE extends RSEBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemRSE.class);
-
-  private FileSystem fs;
-  private Path basePath;
-  private DrillConfig dConfig;
-
-  public FileSystemRSE(FileSystemRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
-    this.dConfig = dConfig;
-    
-    try {
-      URI u = new URI(engineConfig.root);
-      String path = u.getPath();
-      
-      if(path.charAt(path.length()-1) != '/') throw new SetupException(String.format("The file root provided of %s included a file '%s'.  This must be a base path.", engineConfig.root, u.getPath()));
-      fs = FileSystem.get(u, new Configuration());
-      basePath = new Path(u.getPath());
-    } catch (URISyntaxException | IOException e) {
-      throw new SetupException("Failure while reading setting up file system root path.", e);
-    }
-  }
-
-  
-  @JsonTypeName("fs")
-  public static class FileSystemRSEConfig extends StorageEngineConfigBase {
-    private String root;
-
-    @JsonCreator
-    public FileSystemRSEConfig(@JsonProperty("root") String root) {
-      this.root = root;
-    }
-  }
-  
-  public static class FileSystemInputConfig {
-    public FileSpec[] files;
-  }
-  
-  public static class FileSpec{
-    public String path;
-    public ConverterType type;
-  }
-  
-  
-  public class FSEntry implements ReadEntry{
-    Path path;
-    ConverterType type;
-    SchemaPath rootPath;
-
-    public FSEntry(FileSpec spec, SchemaPath rootPath){
-      this.path = new Path(basePath, spec.path);
-      this.type = spec.type;
-      this.rootPath = rootPath;
-    }
-        
-  }
-
-  public class FileSystemOutputConfig {
-    public String file;
-    public ConverterType type;
-  }
-
-  public boolean supportsRead() {
-    return true;
-  }
-  
-  public boolean supportsWrite() {
-    return true;
-  }
-
-  @Override
-  public RecordRecorder getWriter(Store store) throws IOException {
-    FileSystemOutputConfig config = store.getTarget().getWith(dConfig, FileSystemOutputConfig.class);
-    OutputStream out = fs.create(new Path(basePath, config.file));
-    return new OutputStreamWriter(out, config.type, true);
-  }
-
-  @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
-    Set<ReadEntry> s = new HashSet<ReadEntry>();
-    for(FileSpec f : scan.getSelection().getWith(dConfig, FileSystemInputConfig.class).files){
-      s.add(new FSEntry(f, scan.getOutputReference()));
-    }
-    return s;
-  }
-
-  @Override
-  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
-    FSEntry e = getReadEntry(FSEntry.class, readEntry);
-    
-    switch(e.type){
-    case JSON:
-      return new JSONRecordReader(e.rootPath, dConfig, fs.open(e.path), parentROP);
-    default:
-      throw new UnsupportedOperationException();
-    }
-  }
-  
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java b/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
deleted file mode 100644
index 24434d5..0000000
--- a/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.ref.rops.DataWriter;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-
-public class JSONDataWriter implements DataWriter{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONDataWriter.class);
-  
-  private final JsonGenerator g;
-//  private CharSequence transientName;
-  
-  public JSONDataWriter(OutputStream out) throws IOException{
-    JsonFactory f = new JsonFactory();
-    
-    this.g = f.createJsonGenerator(out, JsonEncoding.UTF8);
-    this.g.useDefaultPrettyPrinter();
-  }
-  
-  private String s(CharSequence seq) {
-    String s = (seq instanceof String) ? (String) seq : seq.toString();
-    return s;
-  }
-  
-  @Override
-  public void startRecord() throws IOException {
-    
-  }
-
-  @Override
-  public void writeArrayStart(int length) throws IOException {
-    g.writeStartArray();
-  }
-
-  @Override
-  public void writeArrayElementStart() throws IOException {
-  }
-
-  @Override
-  public void writeArrayElementEnd() throws IOException {
-  }
-
-  @Override
-  public void writeArrayEnd() throws IOException {
-    g.writeEndArray();
-  }
-
-  @Override
-  public void writeMapStart() throws IOException {
-    g.writeStartObject();
-  }
-
-  @Override
-  public void writeMapKey(CharSequence seq) throws IOException {
-    g.writeFieldName(s(seq));
-  }
-
-  @Override
-  public void writeMapValueStart() throws IOException {
-  }
-
-  @Override
-  public void writeMapValueEnd() throws IOException {
-  }
-
-  @Override
-  public void writeMapEnd() throws IOException {
-    g.writeEndObject();
-  }
-
-  @Override
-  public void writeBoolean(boolean b) throws IOException {
-    g.writeBoolean(b);
-  }
-
-  @Override
-  public void writeSInt32(int value) throws IOException {
-    g.writeNumber(value);
-  }
-
-  @Override
-  public void writeSInt64(long value) throws IOException {
-    g.writeNumber(value);
-  }
-
-  @Override
-  public void writeBytes(byte[] bytes) throws IOException {
-    g.writeBinary(bytes);
-  }
-
-  @Override
-  public void writeSFloat64(double value) throws IOException {
-    g.writeNumber(value);
-  }
-
-  @Override
-  public void writeSFloat32(float value) throws IOException {
-    g.writeNumber(value);
-  }
-
-  @Override
-  public void writeNullValue() throws IOException {
-    g.writeNull();
-  }
-
-  @Override
-  public void writeCharSequence(CharSequence value) throws IOException {
-    g.writeString(s(value));
-  }
-
-  @Override
-  public void endRecord() throws IOException {
-    g.writeRawValue("\n");
-  }
-  
-  public void finish() throws IOException{
-    g.close();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
deleted file mode 100644
index 7510e72..0000000
--- a/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Iterator;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.exceptions.RecordException;
-import org.apache.drill.exec.ref.rops.ROP;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
-import org.apache.drill.exec.ref.values.SimpleArrayValue;
-import org.apache.drill.exec.ref.values.SimpleMapValue;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-
-public class JSONRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
-
-  private InputStreamReader input;
-  private String file;
-  private SchemaPath rootPath;
-  private JsonParser parser;
-  private UnbackedRecord record = new UnbackedRecord();
-  private ObjectMapper mapper;
-  private ROP parent;
-
-  public JSONRecordReader(SchemaPath rootPath, DrillConfig dConfig, InputStream stream, ROP parent) throws IOException {
-    this.input = new InputStreamReader(stream, Charsets.UTF_8);
-    this.mapper = dConfig.getMapper();
-    this.parser = mapper.getFactory().createJsonParser(input);
-    this.parent = parent;
-    this.rootPath = rootPath;
-  }
-
-  private class NodeIter implements RecordIterator {
-
-    @Override
-    public NextOutcome next() {
-//      logger.debug("Next Record Called");
-      try {
-        if (parser.nextToken() == null) {
-//          logger.debug("No current token, returning.");
-          return NextOutcome.NONE_LEFT;
-        }
-        JsonNode n = mapper.readTree(parser);
-        if (n == null) {
-//          logger.debug("Nothing was returned for read tree, returning.");
-          return NextOutcome.NONE_LEFT;
-        }
-//        logger.debug("Record found, returning new json record.");
-        record.setClearAndSetRoot(rootPath, convert(n));
-        // todo, add schema checking here.
-        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
-      } catch (IOException e) {
-        throw new RecordException("Failure while reading record", null, e);
-      }
-    }
-
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return record;
-    }
-
-
-    @Override
-    public ROP getParent() {
-      return parent;
-    }
-
-  }
-
-  private DataValue convert(JsonNode node) {
-    if (node == null || node.isNull() || node.isMissingNode()) {
-      return DataValue.NULL_VALUE;
-    } else if (node.isArray()) {
-      SimpleArrayValue arr = new SimpleArrayValue(node.size());
-      for (int i = 0; i < node.size(); i++) {
-        arr.addToArray(i, convert(node.get(i)));
-      }
-      return arr;
-    } else if (node.isObject()) {
-      SimpleMapValue map = new SimpleMapValue();
-      String name;
-      for (Iterator<String> iter = node.fieldNames(); iter.hasNext();) {
-        name = iter.next();
-        map.setByName(name, convert(node.get(name)));
-      }
-      return map;
-    } else if (node.isBinary()) {
-      try {
-        return new BytesScalar(node.binaryValue());
-      } catch (IOException e) {
-        throw new RuntimeException("Failure converting binary value.", e);
-      }
-    } else if (node.isBigDecimal()) {
-      throw new UnsupportedOperationException();
-//      return new BigDecimalScalar(node.decimalValue());
-    } else if (node.isBigInteger()) {
-      throw new UnsupportedOperationException();
-//      return new BigIntegerScalar(node.bigIntegerValue());
-    } else if (node.isBoolean()) {
-      return new BooleanScalar(node.asBoolean());
-    } else if (node.isFloatingPointNumber()) {
-      if (node.isBigDecimal()) {
-        throw new UnsupportedOperationException();
-//        return new BigDecimalScalar(node.decimalValue());
-      } else {
-        return new DoubleScalar(node.asDouble());
-      }
-    } else if (node.isInt()) {
-      return new IntegerScalar(node.asInt());
-    } else if (node.isLong()) {
-      return new LongScalar(node.asLong());
-    } else if (node.isTextual()) {
-      return new StringScalar(node.asText());
-    } else {
-      throw new UnsupportedOperationException(String.format("Don't know how to convert value of type %s.", node
-          .getClass().getCanonicalName()));
-    }
-
-  }
-
-  
-  /* (non-Javadoc)
-   * @see org.apache.drill.exec.ref.rse.DataReader#getIterator()
-   */
-  @Override
-  public RecordIterator getIterator() {
-    return new NodeIter();
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.drill.exec.ref.rse.DataReader#cleanup()
-   */
-  @Override
-  public void cleanup() {
-    try {
-      parser.close();
-      this.input.close();
-    } catch (IOException e) {
-      logger.warn("Error while closing InputStream for file {}", file, e);
-    }
-
-  }
-
-
-  @Override
-  public void setup() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java b/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
deleted file mode 100644
index 20d5b8f..0000000
--- a/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
-import org.apache.drill.exec.ref.rops.DataWriter;
-import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-public class OutputStreamWriter implements RecordRecorder{
-  
-  private OutputStream stream;
-  private FSDataOutputStream posStream;
-  private DataWriter writer;
-  private ConverterType type;
-  private boolean closeStream;
-  
-  public OutputStreamWriter(OutputStream stream, ConverterType type, boolean closeStream){
-    this.stream = stream;
-    this.closeStream = closeStream;
-    this.type = type;
-    if(stream instanceof FSDataOutputStream) posStream = (FSDataOutputStream) stream;
-  }
-
-  @Override
-  public void setup() throws IOException {
-    DataWriter w = null;
-    switch(type){
-    case JSON:
-      w = new JSONDataWriter(stream);
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
-    this.writer = w;
-  }
-  
-  private long getPos() throws IOException{
-    if(posStream == null) return 0;
-    return posStream.getPos();
-  }
-
-  @Override
-  public long recordRecord(RecordPointer pointer) throws IOException {
-    pointer.write(writer);
-    return getPos();
-  }
-
-  @Override
-  public void finish(OutcomeType outcome) throws IOException {
-    writer.finish();
-    if(closeStream){
-      stream.close();
-    }else{
-      stream.flush();
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/QueueRSE.java b/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
deleted file mode 100644
index 9a0a132..0000000
--- a/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-public class QueueRSE extends RSEBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueRSE.class);
-
-  private DrillConfig dConfig;
-  private final List<Queue<Object>> sinkQueues;
-  
-  public QueueRSE(QueueRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
-    this.dConfig = dConfig;
-    sinkQueues = Collections.singletonList( (Queue<Object>) (new ArrayBlockingQueue<Object>(100)));
-  }
-
-  public Queue<Object> getQueue(int number){
-    return sinkQueues.get(number);
-  }
-  
-  @JsonTypeName("queue") public static class QueueRSEConfig extends StorageEngineConfigBase {}
-  
-  public static class QueueOutputInfo{
-    public int number;
-  }
-
-  public boolean supportsWrite() {
-    return true;
-  }
-
-  
-  @Override
-  public RecordRecorder getWriter(Store store) throws IOException {
-    QueueOutputInfo config = store.getTarget().getWith(dConfig, QueueOutputInfo.class);
-    Queue<Object> q = dConfig.getQueue(config.number);
-    return new QueueRecordRecorder(q);
-  }
-
-  
-  private class QueueRecordRecorder implements RecordRecorder{
-
-    private final Queue<Object> queue;
-    
-    public QueueRecordRecorder(Queue<Object> queue) {
-      this.queue = queue;
-    }
-
-    @Override
-    public void setup() throws IOException {
-    }
-
-    @Override
-    public long recordRecord(RecordPointer r) throws IOException {
-      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      final JSONDataWriter writer = new JSONDataWriter(baos);
-      r.write(writer);
-      writer.finish();
-      queue.add(baos.toByteArray());
-      return 0;
-    }
-
-    @Override
-    public void finish(OutcomeType type) throws IOException {
-      queue.add(type);
-    }
-    
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/RSEBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSEBase.java b/sandbox/prototype/exec/java-exec/rse/RSEBase.java
deleted file mode 100644
index 3f86c98..0000000
--- a/sandbox/prototype/exec/java-exec/rse/RSEBase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.ref.ExecRefConstants;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.exceptions.MajorException;
-import org.apache.drill.exec.ref.rops.ROP;
-
-import com.typesafe.config.Config;
-
-public abstract class RSEBase implements ReferenceStorageEngine{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSEBase.class);
-  
-  @Override
-  public boolean supportsRead() {
-    return false;
-  }
-
-  @Override
-  public boolean supportsWrite() {
-    return false;
-  }
-
-  @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
-    throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
-  }
-
-  @Override
-  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
-    throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
-  }
-
-  @Override
-  public RecordRecorder getWriter(Store store) throws IOException {
-    throw new UnsupportedOperationException(String.format("%s does not support writes.", this.getClass().getCanonicalName()));
-  }
-  
-  public static Class<?>[] getSubTypes(Config config){
-    Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-    return engines.toArray(new Class<?>[engines.size()]);
-  }
-
-  @SuppressWarnings("unchecked")
-  protected <T extends ReadEntry> T getReadEntry(Class<T> c, ReadEntry entry){
-    if(!c.isAssignableFrom(entry.getClass())) throw new MajorException(String.format("Expected entry type was invalid.  Expected entry of type %s but received type of %s.", c.getCanonicalName(), entry.getClass().getCanonicalName()));
-    return (T) entry;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSERegistry.java b/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
deleted file mode 100644
index 4266aac..0000000
--- a/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.ref.ExecRefConstants;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-
-import com.typesafe.config.Config;
-
-public class RSERegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSERegistry.class);
-  
-  private Map<Object, Constructor<? extends ReferenceStorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends ReferenceStorageEngine>>();
-  private Map<StorageEngineConfig, ReferenceStorageEngine> activeEngines = new HashMap<StorageEngineConfig, ReferenceStorageEngine>();
-  private DrillConfig config;
-  
-  public RSERegistry(DrillConfig config){
-    this.config = config;
-    setup(config);
-  }
-  
-  @SuppressWarnings("unchecked")
-  public void setup(DrillConfig config){
-    Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-    logger.debug("Loading storage engines {}", engines);
-    for(Class<? extends ReferenceStorageEngine> engine: engines){
-      int i =0;
-      for(Constructor<?> c : engine.getConstructors()){
-        Class<?>[] params = c.getParameterTypes();
-        if(params.length != 2 || params[1] == Config.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
-          logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, Config)]", c, engine);
-          continue;
-        }
-        availableEngines.put(params[0], (Constructor<? extends ReferenceStorageEngine>) c);
-        i++;
-      }
-      if(i == 0){
-        logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
-      }
-    }
-  }
-  
-  public ReferenceStorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{
-    ReferenceStorageEngine engine = activeEngines.get(engineConfig);
-    if(engine != null) return engine;
-    Constructor<? extends ReferenceStorageEngine> c = availableEngines.get(engineConfig.getClass());
-    if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
-    try {
-      return c.newInstance(engineConfig, config);
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
-      if(t instanceof SetupException) throw ((SetupException) t);
-      throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
-    }
-  }
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordReader.java b/sandbox/prototype/exec/java-exec/rse/RecordReader.java
deleted file mode 100644
index b7840bc..0000000
--- a/sandbox/prototype/exec/java-exec/rse/RecordReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import org.apache.drill.exec.ref.RecordIterator;
-
-public interface RecordReader {
-
-  public abstract RecordIterator getIterator();
-  public abstract void setup();
-  public abstract void cleanup();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java b/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
deleted file mode 100644
index 9527b0b..0000000
--- a/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome;
-
-public interface RecordRecorder {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
-  
-  public void setup() throws IOException;
-  public long recordRecord(RecordPointer pointer) throws IOException;
-  public void finish(RunOutcome.OutcomeType outcome) throws IOException;
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java b/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
deleted file mode 100644
index 41cba45..0000000
--- a/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ref.rse;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.Store;
-import org.apache.drill.exec.ref.rops.ROP;
-
-
-public interface ReferenceStorageEngine {
-  public boolean supportsRead();
-  public boolean supportsWrite();
-
-  public enum PartitionCapabilities {
-    NONE, HASH, SORTED;
-  }
-
-  public enum MemoryFormat {
-    RECORD, FIELD;
-  }
-
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
-  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException;
-  public RecordRecorder getWriter(Store store) throws IOException;
-
-  public interface ReadEntry{}
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index aa42fc1..c62d445 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -26,9 +26,9 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.protobuf.Internal.EnumLite;
 
-public abstract class BasicClient<T extends Enum<T>> extends RpcBus<T> {
+public abstract class BasicClient<T extends EnumLite> extends RpcBus<T> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   private Bootstrap b;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index acf1822..a75dee0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInboundMessageHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -28,17 +26,18 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
 
 import org.apache.drill.exec.exception.DrillbitStartupException;
 
+import com.google.protobuf.Internal.EnumLite;
+
 /**
  * A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
  * requests will generate more than one outbound request.
  */
-public abstract class BasicServer<T extends Enum<T>> extends RpcBus<T>{
+public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
 
   private ServerBootstrap b;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index a924359..c796e2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -22,65 +22,70 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
 /**
  * Manages the creation of rpc futures for a particular socket.
  */
-public class CoordinationQueue{
+public class CoordinationQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
-  
+
   private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
   private final Map<Integer, DrillRpcFuture<?>> map;
-  
-  
-  public CoordinationQueue(int segmentSize, int segmentCount){
+
+  public CoordinationQueue(int segmentSize, int segmentCount) {
     map = new ConcurrentHashMap<Integer, DrillRpcFuture<?>>(segmentSize, 0.75f, segmentCount);
   }
-  
-  void channelClosed(Exception ex){
-    for(DrillRpcFuture<?> f : map.values()){
+
+  void channelClosed(Exception ex) {
+    for (DrillRpcFuture<?> f : map.values()) {
       f.setException(ex);
     }
   }
-  
-  public <V> DrillRpcFuture<V> getNewFuture(Class<V> clazz){
+
+  public <V> DrillRpcFuture<V> getNewFuture(Class<V> clazz) {
     int i = circularInt.getNext();
     DrillRpcFuture<V> future = DrillRpcFuture.getNewFuture(i, clazz);
-//    logger.debug("Writing to map coord {}, future {}", i, future);
-    Object old = map.put(i,  future);
-    if(old != null) throw new IllegalStateException("You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
+    // logger.debug("Writing to map coord {}, future {}", i, future);
+    Object old = map.put(i, future);
+    if (old != null)
+      throw new IllegalStateException(
+          "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
     return future;
   }
 
-  private DrillRpcFuture<?> removeFromMap(int coordinationId){
+  private DrillRpcFuture<?> removeFromMap(int coordinationId) {
     DrillRpcFuture<?> rpc = map.remove(coordinationId);
-    if(rpc == null){
+    if (rpc == null) {
       logger.error("Rpc is null.");
-      throw new IllegalStateException("Attempting to retrieve an rpc that wasn't first stored in the rpc coordination queue.  This would most likely happen if you're opposite endpoint sent the multiple messages on the same coordination id.");
+      throw new IllegalStateException(
+          "Attempting to retrieve an rpc that wasn't first stored in the rpc coordination queue.  This would most likely happen if you're opposite endpoint sent multiple messages on the same coordination id.");
     }
     return rpc;
   }
-  
-  public <V> DrillRpcFuture<V> getFuture(int coordinationId, Class<V> clazz){
-//    logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
+
+  public <V> DrillRpcFuture<V> getFuture(int coordinationId, Class<V> clazz) {
+    // logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
     DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
-//    logger.debug("Got rpc from map {}", rpc);
-    
-    if(rpc.clazz != clazz){
-      logger.error("Rpc class is not expected class {}", rpc.clazz, clazz);
-      throw new IllegalStateException("You attempted to request a future for a coordination id that has a different value class than was used when you initially created the coordination id.  This shouldn't happen.");
+    // logger.debug("Got rpc from map {}", rpc);
+    Class<?> outcomeClass = rpc.getOutcomeClass();
+    if (outcomeClass != clazz) {
+      logger.error("Rpc class is not expected class. Original: {}, requested: {}", outcomeClass.getCanonicalName(), clazz.getCanonicalName());
+      throw new IllegalStateException(
+          String
+              .format(
+                  "You attempted to request a future for a coordination id that has a different value class than was used when you "
+                      + "initially created the coordination id.  Requested class %s, originally expected class %s.  This shouldn't happen.  ",
+                  clazz.getCanonicalName(), outcomeClass.getCanonicalName()));
     }
-    
+
     @SuppressWarnings("unchecked")
-    DrillRpcFuture<V> crpc = (DrillRpcFuture<V>) rpc; 
-    
-//    logger.debug("Returning casted future");
+    DrillRpcFuture<V> crpc = (DrillRpcFuture<V>) rpc;
+
+    // logger.debug("Returning casted future");
     return crpc;
   }
-  
-  public void updateFailedFuture(int coordinationId, RpcFailure failure){
-//    logger.debug("Updating failed future.");
+
+  public void updateFailedFuture(int coordinationId, RpcFailure failure) {
+    // logger.debug("Updating failed future.");
     DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
     rpc.setException(new RemoteRpcException(failure));
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index 5a2fd93..9a4a7f7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -27,7 +27,7 @@ public class DrillRpcFuture<V> extends AbstractCheckedFuture<V, RpcException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
 
   final int coordinationId;
-  final Class<V> clazz;
+  private final Class<V> clazz;
 
   public DrillRpcFuture(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
     super(delegate);
@@ -35,6 +35,10 @@ public class DrillRpcFuture<V> extends AbstractCheckedFuture<V, RpcException> {
     this.clazz = clazz;
   }
 
+  public Class<V> getOutcomeClass(){
+    return clazz;
+  }
+  
   /**
    * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
    * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
index bb7644e..91c3d45 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
 class OutboundRpcMessage extends RpcMessage{
@@ -28,8 +29,8 @@ class OutboundRpcMessage extends RpcMessage{
 
   final MessageLite pBody;
   
-  public OutboundRpcMessage(RpcMode mode, Enum<?> rpcType, int coordinationId, MessageLite pBody, ByteBuf dBody) {
-    super(mode, rpcType.ordinal(), coordinationId, dBody);
+  public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf dBody) {
+    super(mode, rpcType.getNumber(), coordinationId, dBody);
     this.pBody = pBody;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
index 8a2f48d..4bd592b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
@@ -19,16 +19,17 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
 
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
 public class Response {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Response.class);
   
-  public Enum<?> rpcType;
+  public EnumLite rpcType;
   public MessageLite pBody;
   public ByteBuf dBody;
   
-  public Response(Enum<?> rpcType, MessageLite pBody, ByteBuf dBody) {
+  public Response(EnumLite rpcType, MessageLite pBody, ByteBuf dBody) {
     super();
     this.rpcType = rpcType;
     this.pBody = pBody;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3b20193/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 760bd30..76300d1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
@@ -40,7 +41,7 @@ import com.google.protobuf.Parser;
  * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
  * @param <T>
  */
-public abstract class RpcBus<T extends Enum<T>> implements Closeable{
+public abstract class RpcBus<T extends EnumLite> implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcBus.class);
   
   private CoordinationQueue queue = new CoordinationQueue(16, 16);


[2/2] git commit: Fix NPE, add runbit script to start up Drilbit.

Posted by ja...@apache.org.
Fix NPE, add runbit script to start up Drilbit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1746c92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1746c92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1746c92

Branch: refs/heads/execwork
Commit: f1746c92ffbd74ef9622af294768372fd1676459
Parents: f3b2019
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue Apr 16 10:59:49 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 16 10:59:49 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/service/ServiceEngine.java   |    5 +++--
 .../prototype/exec/java-exec/src/test/sh/runbit    |    9 +++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1746c92/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 97db72e..5d83bdb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -43,9 +43,10 @@ public class ServiceEngine implements Closeable{
   DrillbitContext context;
   
   public ServiceEngine(DrillbitContext context){
+    this.context = context;
     ByteBufAllocator allocator = context.getAllocator().getUnderlyingAllocator();
-    userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
-    bitCom = new BitComImpl(context);
+    this.userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
+    this.bitCom = new BitComImpl(context);
   }
   
   public void start() throws DrillbitStartupException, InterruptedException{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1746c92/sandbox/prototype/exec/java-exec/src/test/sh/runbit
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/sh/runbit b/sandbox/prototype/exec/java-exec/src/test/sh/runbit
new file mode 100755
index 0000000..10fc1d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/sh/runbit
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+PROJECT_ROOT=../../../
+
+mvn dependency:build-classpath -f=$PROJECT_ROOT/pom.xml -Dmdep.outputFile=target/sh/cp.txt
+CP=`cat $PROJECT_ROOT/target/sh/cp.txt`
+CP=$CP:$PROJECT_ROOT/target/classes:$PROJECT_ROOT/target/test-classes
+java -cp $CP org.apache.drill.exec.server.Drillbit
+