You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/05/02 16:44:59 UTC

[3/7] hive git commit: HIVE-19211: New streaming ingest API and support for dynamic partitioning (Prasanth Jayachandran reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
deleted file mode 100644
index 0011b14..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidColumn extends StreamingException {
-
-  public InvalidColumn(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
deleted file mode 100644
index f1f9804..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidPartition extends StreamingException {
-
-  public InvalidPartition(String partitionName, String partitionValue) {
-    super("Invalid partition: Name=" + partitionName +
-            ", Value=" + partitionValue);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
index ef1c91d..5c60160 100644
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -24,15 +24,11 @@ public class InvalidTable extends StreamingException {
     return "Invalid table db:" + db + ", table:" + table;
   }
 
-  public InvalidTable(String db, String table) {
-    super(makeMsg(db,table), null);
-  }
-
-  public InvalidTable(String db, String table, String msg) {
+  InvalidTable(String db, String table, String msg) {
     super(makeMsg(db, table) + ": " + msg, null);
   }
 
-  public InvalidTable(String db, String table, Exception inner) {
+  InvalidTable(String db, String table, Exception inner) {
     super(makeMsg(db, table) + ": " + inner.getMessage(), inner);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
new file mode 100644
index 0000000..9d92dfa
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTransactionState extends TransactionError {
+  InvalidTransactionState(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
deleted file mode 100644
index 762f5f8..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidTrasactionState extends TransactionError {
-  public InvalidTrasactionState(String msg) {
-    super(msg);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
index 5f9aca6..e464399 100644
--- a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
@@ -19,7 +19,7 @@
 package org.apache.hive.streaming;
 
 public class PartitionCreationFailed extends StreamingException {
-  public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
-    super("Failed to create partition " + endPoint, cause);
+  PartitionCreationFailed(StreamingConnection connection, Throwable cause) {
+    super("Failed to create partition " + connection, cause);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java b/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
new file mode 100644
index 0000000..1b0e7de
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.List;
+
+public interface PartitionHandler {
+
+  /**
+   * Creates a partition if it does not exist.
+   *
+   * @param partitionValues - partition values
+   * @return partition location
+   * @throws StreamingException - any metastore related exceptions
+   */
+  PartitionInfo createPartitionIfNotExists(List<String> partitionValues) throws StreamingException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
new file mode 100644
index 0000000..ce9f76a
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+/**
+ * Simple wrapper class for minimal partition related information used by streaming ingest.
+ */
+public class PartitionInfo {
+  private String name;
+  private String partitionLocation;
+  private boolean exists;
+
+  public PartitionInfo(final String name, final String partitionLocation, final boolean exists) {
+    this.name = name;
+    this.partitionLocation = partitionLocation;
+    this.exists = exists;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(final String name) {
+    this.name = name;
+  }
+
+  public String getPartitionLocation() {
+    return partitionLocation;
+  }
+
+  public void setPartitionLocation(final String partitionLocation) {
+    this.partitionLocation = partitionLocation;
+  }
+
+  public boolean isExists() {
+    return exists;
+  }
+
+  public void setExists(final boolean exists) {
+    this.exists = exists;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
deleted file mode 100644
index ccd3ae0..0000000
--- a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class QueryFailedException extends StreamingException {
-  String query;
-
-  public QueryFailedException(String query, Exception e) {
-    super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
-    this.query = query;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
index dc6d70e..4d25924 100644
--- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -19,25 +19,44 @@
 package org.apache.hive.streaming;
 
 
+import java.util.Set;
+
 public interface RecordWriter {
 
-  /** Writes using a hive RecordUpdater
+  /**
+   * Initialize record writer.
+   *
+   * @param connection - streaming connection
+   * @param minWriteId - min write id
+   * @param maxWriteID - max write id
+   * @throws StreamingException - thrown when initialization failed
+   */
+  void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException;
+
+  /**
+   * Writes using a hive RecordUpdater
    *
    * @param writeId the write ID of the table mapping to Txn in which the write occurs
-   * @param record the record to be written
+   * @param record  the record to be written
    */
   void write(long writeId, byte[] record) throws StreamingException;
 
-  /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+  /**
+   * Flush records from buffer. Invoked by TransactionBatch.commitTransaction()
+   */
   void flush() throws StreamingException;
 
-  /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
-  void clear() throws StreamingException;
-
-  /** Acquire a new RecordUpdater. Invoked when
-   * StreamingConnection.fetchTransactionBatch() is called */
-  void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
+  /**
+   * Close the RecordUpdater. Invoked by TransactionBatch.close()
+   *
+   * @throws StreamingException - thrown when record writer cannot be closed.
+   */
+  void close() throws StreamingException;
 
-  /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
-  void closeBatch() throws StreamingException;
+  /**
+   * Get the set of partitions that were added by the record writer.
+   *
+   * @return - set of partitions
+   */
+  Set<String> getPartitions();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/SerializationError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/SerializationError.java b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
index a57ba00..1473ff8 100644
--- a/streaming/src/java/org/apache/hive/streaming/SerializationError.java
+++ b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -20,7 +20,7 @@ package org.apache.hive.streaming;
 
 
 public class SerializationError extends StreamingException {
-  public SerializationError(String msg, Exception e) {
+  SerializationError(String msg, Exception e) {
     super(msg,e);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index 2f760ea..cd7f3d8 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -18,40 +18,47 @@
 
 package org.apache.hive.streaming;
 
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf;
 
-/**
- * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
- * Note: the expectation is that there is at most 1 TransactionBatch outstanding for any given
- * StreamingConnection.  Violating this may result in "out of sequence response".
- */
-public interface StreamingConnection {
+public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
+  /**
+   * Returns hive configuration object used during connection creation.
+   *
+   * @return - hive conf
+   */
+  HiveConf getHiveConf();
+
+  /**
+   * Begin a transaction for writing.
+   *
+   * @throws StreamingException - if there are errors when beginning transaction
+   */
+  void beginTransaction() throws StreamingException;
+
+  /**
+   * Write record using RecordWriter.
+   *
+   * @param record - the data to be written
+   * @throws StreamingException - if there are errors when writing
+   */
+  void write(byte[] record) throws StreamingException;
 
   /**
-   * Acquires a new batch of transactions from Hive.
-
-   * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
-   * @param writer  Used to write record. The same writer instance can
-   *                      be shared with another TransactionBatch (to the same endpoint)
-   *                      only after the first TransactionBatch has been closed.
-   *                      Writer will be closed when the TransactionBatch is closed.
-   * @return
-   * @throws ConnectionError
-   * @throws InvalidPartition
-   * @throws StreamingException
-   * @return a batch of transactions
+   * Commit a transaction to make the writes visible for readers.
+   *
+   * @throws StreamingException - if there are errors when committing the open transaction
    */
-  public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
-                                                RecordWriter writer)
-          throws ConnectionError, StreamingException, InterruptedException;
+  void commitTransaction() throws StreamingException;
 
   /**
-   * Close connection
+   * Manually abort the opened transaction.
+   *
+   * @throws StreamingException - if there are errors when aborting the transaction
    */
-  public void close();
+  void abortTransaction() throws StreamingException;
 
   /**
-   * @return UserGroupInformation associated with this connection or {@code null} if there is none
+   * Closes streaming connection.
    */
-  UserGroupInformation getUserGroupInformation();
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingException.java b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
index a7f84c1..1af5c6a 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingException.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -19,7 +19,7 @@
 package org.apache.hive.streaming;
 
 public class StreamingException extends Exception {
-  public StreamingException(String msg, Exception cause) {
+  public StreamingException(String msg, Throwable cause) {
     super(msg, cause);
   }
   public StreamingException(String msg) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
index 0dfbfa7..090167d 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -21,11 +21,11 @@ package org.apache.hive.streaming;
 
 public class StreamingIOFailure extends StreamingException {
 
-  public StreamingIOFailure(String msg, Exception cause) {
+  StreamingIOFailure(String msg, Exception cause) {
     super(msg, cause);
   }
 
-  public StreamingIOFailure(String msg) {
+  StreamingIOFailure(String msg) {
     super(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
new file mode 100644
index 0000000..4a07435
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.util.Properties;
+
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed to extract partition values, bucketing info and is forwarded to record updater.
+ * Uses Lazy Simple SerDe to process delimited input.
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
+ */
+public class StrictDelimitedInputWriter extends AbstractRecordWriter {
+  private char fieldDelimiter;
+  private char collectionDelimiter;
+  private char mapKeyDelimiter;
+  private LazySimpleSerDe serde;
+
+  private StrictDelimitedInputWriter(Builder builder) {
+    this.fieldDelimiter = builder.fieldDelimiter;
+    this.collectionDelimiter = builder.collectionDelimiter;
+    this.mapKeyDelimiter = builder.mapKeyDelimiter;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private char fieldDelimiter = (char) LazySerDeParameters.DefaultSeparators[0];
+    private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1];
+    private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2];
+
+    public Builder withFieldDelimiter(final char fieldDelimiter) {
+      this.fieldDelimiter = fieldDelimiter;
+      return this;
+    }
+
+    public Builder withCollectionDelimiter(final char collectionDelimiter) {
+      this.collectionDelimiter = collectionDelimiter;
+      return this;
+    }
+
+    public Builder withMapKeyDelimiter(final char mapKeyDelimiter) {
+      this.mapKeyDelimiter = mapKeyDelimiter;
+      return this;
+    }
+
+    public StrictDelimitedInputWriter build() {
+      return new StrictDelimitedInputWriter(this);
+    }
+  }
+
+  @Override
+  public Object encode(byte[] record) throws SerializationError {
+    try {
+      BytesWritable blob = new BytesWritable();
+      blob.set(record, 0, record.length);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+  @Override
+  public LazySimpleSerDe createSerde() throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+      tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
+      tableProps.setProperty(serdeConstants.FIELD_DELIM, String.valueOf(fieldDelimiter));
+      tableProps.setProperty(serdeConstants.COLLECTION_DELIM, String.valueOf(collectionDelimiter));
+      tableProps.setProperty(serdeConstants.MAPKEY_DELIM, String.valueOf(mapKeyDelimiter));
+      LazySimpleSerDe serde = new LazySimpleSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      this.serde = serde;
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 0077913..1600e7c 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -18,131 +18,45 @@
 
 package org.apache.hive.streaming;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.Properties;
+
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.JsonSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
-import org.apache.hive.hcatalog.data.JsonSerDe;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
 
 /**
  * Streaming Writer handles utf8 encoded Json (Strict syntax).
- * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
  */
 public class StrictJsonWriter extends AbstractRecordWriter {
   private JsonSerDe serde;
 
-  private final HCatRecordObjectInspector recordObjInspector;
-  private final ObjectInspector[] bucketObjInspectors;
-  private final StructField[] bucketStructFields;
-
-  /**
-   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
-   */
-  public StrictJsonWriter(HiveEndPoint endPoint)
-    throws ConnectionError, SerializationError, StreamingException {
-    this(endPoint, null, null);
-  }
-
-  /**
-   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
-   */
-  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
-    this(endPoint, conf, null);
-  }
-  /**
-   * @param endPoint the end point to write to
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
-          throws ConnectionError, SerializationError, StreamingException {
-    this(endPoint, null, conn);
+  public static Builder newBuilder() {
+    return new Builder();
   }
-  /**
-   * @param endPoint the end point to write to
-   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
-   * @param conn connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
-          throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, conf, conn);
-    this.serde = createSerde(tbl, conf);
-    // get ObjInspectors for entire record and bucketed cols
-    try {
-      recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
-      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
-    } catch (SerDeException e) {
-      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
-    }
 
-    // get StructFields for bucketed cols
-    bucketStructFields = new StructField[bucketIds.size()];
-    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
-    for (int i = 0; i < bucketIds.size(); i++) {
-      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+  public static class Builder {
+    public StrictJsonWriter build() {
+      return new StrictJsonWriter();
     }
   }
 
-  @Override
-  public AbstractSerDe getSerde() {
-    return serde;
-  }
-
-  protected HCatRecordObjectInspector getRecordObjectInspector() {
-    return recordObjInspector;
-  }
-
-  @Override
-  protected StructField[] getBucketStructFields() {
-    return bucketStructFields;
-  }
-
-  protected ObjectInspector[] getBucketObjectInspectors() {
-    return bucketObjInspectors;
-  }
-
-
-  @Override
-  public void write(long writeId, byte[] record)
-          throws StreamingIOFailure, SerializationError {
-    try {
-      Object encodedRow = encode(record);
-      int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(writeId, encodedRow);
-    } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction write id("
-              + writeId + ")", e);
-    }
-
-  }
-
   /**
    * Creates JsonSerDe
-   * @param tbl   used to create serde
-   * @param conf  used to create serde
-   * @return
+   *
    * @throws SerializationError if serde could not be initialized
    */
-  private static JsonSerDe createSerde(Table tbl, HiveConf conf)
-          throws SerializationError {
+  @Override
+  public JsonSerDe createSerde() throws SerializationError {
     try {
       Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
       JsonSerDe serde = new JsonSerDe();
       SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      this.serde = serde;
       return serde;
     } catch (SerDeException e) {
       throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
@@ -158,5 +72,4 @@ public class StrictJsonWriter extends AbstractRecordWriter {
       throw new SerializationError("Unable to convert byte[] record into Object", e);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index c0b7324..563cf66 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -18,163 +18,73 @@
 
 package org.apache.hive.streaming;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.RegexSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Text;
 
 /**
  * Streaming Writer handles text input data with regex. Uses
  * org.apache.hadoop.hive.serde2.RegexSerDe
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
  */
 public class StrictRegexWriter extends AbstractRecordWriter {
+  private String regex;
   private RegexSerDe serde;
-  private final StructObjectInspector recordObjInspector;
-  private final ObjectInspector[] bucketObjInspectors;
-  private final StructField[] bucketStructFields;
 
-  /**
-   * @param endPoint the end point to write to
-   * @param conn     connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
-    throws ConnectionError, SerializationError, StreamingException {
-    this(null, endPoint, null, conn);
+  private StrictRegexWriter(final Builder builder) {
+    this.regex = builder.regex;
   }
 
-  /**
-   * @param endPoint the end point to write to
-   * @param conf     a Hive conf object. Should be null if not using advanced Hive settings.
-   * @param conn     connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
-    throws ConnectionError, SerializationError, StreamingException {
-    this(null, endPoint, conf, conn);
+  public static Builder newBuilder() {
+    return new Builder();
   }
 
-  /**
-   * @param regex    to parse the data
-   * @param endPoint the end point to write to
-   * @param conf     a Hive conf object. Should be null if not using advanced Hive settings.
-   * @param conn     connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
-    throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, conf, conn);
-    this.serde = createSerde(tbl, conf, regex);
-    // get ObjInspectors for entire record and bucketed cols
-    try {
-      recordObjInspector = (StructObjectInspector) serde.getObjectInspector();
-      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
-    } catch (SerDeException e) {
-      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
-    }
+  public static class Builder {
+    private String regex;
 
-    // get StructFields for bucketed cols
-    bucketStructFields = new StructField[bucketIds.size()];
-    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
-    for (int i = 0; i < bucketIds.size(); i++) {
-      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    public Builder withRegex(final String regex) {
+      this.regex = regex;
+      return this;
     }
-  }
-
-  @Override
-  public AbstractSerDe getSerde() {
-    return serde;
-  }
-
-  @Override
-  protected StructObjectInspector getRecordObjectInspector() {
-    return recordObjInspector;
-  }
-
-  @Override
-  protected StructField[] getBucketStructFields() {
-    return bucketStructFields;
-  }
-
-  @Override
-  protected ObjectInspector[] getBucketObjectInspectors() {
-    return bucketObjInspectors;
-  }
 
-
-  @Override
-  public void write(long writeId, byte[] record)
-    throws StreamingIOFailure, SerializationError {
-    try {
-      Object encodedRow = encode(record);
-      int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(writeId, encodedRow);
-    } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction write id("
-        + writeId + ")", e);
+    public StrictRegexWriter build() {
+      return new StrictRegexWriter(this);
     }
   }
 
   /**
    * Creates RegexSerDe
    *
-   * @param tbl   used to create serde
-   * @param conf  used to create serde
-   * @param regex used to create serde
-   * @return
    * @throws SerializationError if serde could not be initialized
    */
-  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
-    throws SerializationError {
+  @Override
+  public RegexSerDe createSerde() throws SerializationError {
     try {
       Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
       tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
-      ArrayList<String> tableColumns = getCols(tbl);
-      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ","));
       RegexSerDe serde = new RegexSerDe();
       SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      this.serde = serde;
       return serde;
     } catch (SerDeException e) {
       throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
     }
   }
 
-  private static ArrayList<String> getCols(Table table) {
-    List<FieldSchema> cols = table.getSd().getCols();
-    ArrayList<String> colNames = new ArrayList<String>(cols.size());
-    for (FieldSchema col : cols) {
-      colNames.add(col.getName().toLowerCase());
-    }
-    return colNames;
-  }
-
   /**
    * Encode Utf8 encoded string bytes using RegexSerDe
    *
-   * @param utf8StrRecord
+   * @param utf8StrRecord - serialized record
    * @return The encoded object
-   * @throws SerializationError
+   * @throws SerializationError - in case of any deserialization error
    */
   @Override
   public Object encode(byte[] utf8StrRecord) throws SerializationError {

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
deleted file mode 100644
index 2b05771..0000000
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-
-import java.util.Collection;
-
-/**
- * Represents a set of Transactions returned by Hive. Supports opening, writing to
- * and commiting/aborting each transaction. The interface is designed to ensure
- * transactions in a batch are used up sequentially. To stream to the same HiveEndPoint
- * concurrently, create separate StreamingConnections.
- *
- * Note on thread safety: At most 2 threads can run through a given TransactionBatch at the same
- * time.  One thread may call {@link #heartbeat()} and the other all other methods.
- * Violating this may result in "out of sequence response".
- *
- */
-public interface TransactionBatch  {
-  enum TxnState {
-    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
-
-    private final String code;
-    TxnState(String code) {
-      this.code = code;
-    };
-    public String toString() {
-      return code;
-    }
-  }
-
-  /**
-   * Activate the next available transaction in the current transaction batch.
-   * @throws StreamingException if not able to switch to next Txn
-   * @throws InterruptedException if call in interrupted
-   */
-  void beginNextTransaction() throws StreamingException, InterruptedException;
-
-  /**
-   * Get Id of currently open transaction.
-   * @return transaction id
-   */
-  Long getCurrentTxnId();
-
-
-  /**
-   * Get write Id mapping to currently open transaction.
-   * @return write id
-   */
-  Long getCurrentWriteId();
-
-  /**
-   * get state of current transaction.
-   */
-  TxnState getCurrentTransactionState();
-
-  /**
-   * Commit the currently open transaction.
-   * @throws StreamingException if there are errors committing
-   * @throws InterruptedException if call in interrupted
-   */
-  void commit() throws StreamingException, InterruptedException;
-
-  /**
-   * Abort the currently open transaction.
-   * @throws StreamingException if there are errors
-   * @throws InterruptedException if call in interrupted
-   */
-  void abort() throws StreamingException, InterruptedException;
-
-  /**
-   * Remaining transactions are the ones that are not committed or aborted or open.
-   * Current open transaction is not considered part of remaining txns.
-   * @return number of transactions remaining this batch.
-   */
-  int remainingTransactions();
-
-
-  /**
-   *  Write record using RecordWriter.
-   * @param record  the data to be written
-   * @throws StreamingException if there are errors when writing
-   * @throws InterruptedException if call in interrupted
-   */
-  void write(byte[] record) throws StreamingException, InterruptedException;
-
-  /**
-   *  Write records using RecordWriter.
-   * @throws StreamingException if there are errors when writing
-   * @throws InterruptedException if call in interrupted
-   */
-  void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
-
-
-  /**
-   * Issues a heartbeat to hive metastore on the current and remaining txn ids
-   * to keep them from expiring.
-   * @throws StreamingException if there are errors
-   */
-  void heartbeat() throws StreamingException;
-
-  /**
-   * Close the TransactionBatch.
-   * @throws StreamingException if there are errors closing batch
-   * @throws InterruptedException if call in interrupted
-   */
-  void close() throws StreamingException, InterruptedException;
-  boolean isClosed();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
deleted file mode 100644
index a8c8cd4..0000000
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class TransactionBatchUnAvailable extends StreamingException {
-  public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
-    super("Unable to acquire transaction batch on end point: " + ep, e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
index a331b20..ae56e7c 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -19,11 +19,11 @@
 package org.apache.hive.streaming;
 
 public class TransactionError extends StreamingException {
-  public TransactionError(String msg, Exception e) {
+  TransactionError(String msg, Exception e) {
     super(msg + (e == null ? "" : ": " + e.getMessage()), e);
   }
 
-  public TransactionError(String msg) {
+  TransactionError(String msg) {
     super(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
deleted file mode 100644
index f0843a1..0000000
--- a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import junit.framework.Assert;
-
-public class TestDelimitedInputWriter {
-  @Test
-  public void testFieldReordering() throws Exception {
-
-    ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
-    {//1)  test dropping fields - first middle  & last
-      String[] fieldNames = {null, "col2", null, "col4", null};
-      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
-      Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
-    }
-
-    {//2)  test reordering
-      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
-      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
-      Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
-    }
-
-    {//3)  test bad field names
-      String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
-      try {
-        DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
-        Assert.fail();
-      } catch (InvalidColumn e) {
-        // should throw
-      }
-    }
-
-    {//4)  test few field names
-      String[] fieldNames = {"col3", "col4"};
-      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
-      Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
-    }
-
-    {//5)  test extra field names
-      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
-      try {
-      DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
-      Assert.fail();
-      } catch (InvalidColumn e) {
-        //show throw
-      }
-    }
-  }
-}