You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/06/18 17:49:35 UTC

[3/6] asterixdb git commit: [NO ISSUE] Remove obsolete support for older HDFS versions

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
deleted file mode 100644
index 9dca49a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
-
-  <property>
-    <name>dfs.replication</name>
-    <value>1</value>
-  </property>
-
-  <property>
-    <name>dfs.block.size</name>
-    <value>1048576</value>
-  </property>
-
-  <property>
-    <name>dfs.namenode.fs-limits.min-block-size</name>
-    <value>1048576</value>
-  </property>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
deleted file mode 100755
index 145b484..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
+++ /dev/null
@@ -1,112 +0,0 @@
-#/*
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements.  See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership.  The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License.  You may obtain a copy of the License at
- #
- #   http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied.  See the License for the
- # specific language governing permissions and limitations
- # under the License.
- #*/
-# Define some default values that can be overridden by system properties
-hadoop.root.logger=FATAL,console
-hadoop.log.dir=.
-hadoop.log.file=hadoop.log
-
-# Define the root logger to the system property "hadoop.root.logger".
-log4j.rootLogger=${hadoop.root.logger}, EventCounter
-
-# Logging Threshold
-log4j.threshhold=FATAL
-
-#
-# Daily Rolling File Appender
-#
-
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
-
-# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-# Debugging Pattern format
-#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-
-#
-# console
-# Add "console" to rootlogger above if you want to use this 
-#
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
-
-#
-# TaskLog Appender
-#
-
-#Default values
-hadoop.tasklog.taskid=null
-hadoop.tasklog.noKeepSplits=4
-hadoop.tasklog.totalLogFileSize=100
-hadoop.tasklog.purgeLogSplits=true
-hadoop.tasklog.logsRetainHours=12
-
-log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
-log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
-log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
-
-log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-
-#
-# Rolling File Appender
-#
-
-#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
-
-# Logfile size and and 30-day backups
-#log4j.appender.RFA.MaxFileSize=1MB
-#log4j.appender.RFA.MaxBackupIndex=30
-
-#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
-#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-#
-# FSNamesystem Audit logging
-# All audit events are logged at INFO level
-#
-log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
-
-# Custom Logging levels
-
-#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
-#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
-#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
-
-# Jets3t library
-log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
-
-#
-# Event Counter Appender
-# Sends counts of logging messages at different severity levels to Hadoop Metrics.
-#
-log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
deleted file mode 100644
index e765c65..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
-
-  <property>
-    <name>mapred.job.tracker</name>
-    <value>localhost:29007</value>
-  </property>
-  <property>
-    <name>mapred.tasktracker.map.tasks.maximum</name>
-    <value>20</value>
-  </property>
-  <property>
-    <name>mapred.tasktracker.reduce.tasks.maximum</name>
-    <value>20</value>
-  </property>
-  <property>
-    <name>mapred.max.split.size</name>
-    <value>2048</value>
-  </property>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
deleted file mode 100644
index 68ffe00..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<cluster-topology>
-  <network-switch name="all">
-    <network-switch name="rack1">
-      <terminal name="10.0.0.1"/>
-      <terminal name="10.0.0.5"/>
-      <terminal name="10.0.0.9"/>
-      <terminal name="10.0.0.13"/>
-      <terminal name="10.0.0.17"/>
-    </network-switch>
-    <network-switch name="rack2">
-      <terminal name="10.0.0.2"/>
-      <terminal name="10.0.0.6"/>
-      <terminal name="10.0.0.10"/>
-      <terminal name="10.0.0.14"/>
-      <terminal name="10.0.0.18"/>
-    </network-switch>
-    <network-switch name="rack3">
-      <terminal name="10.0.0.3"/>
-      <terminal name="10.0.0.7"/>
-      <terminal name="10.0.0.11"/>
-      <terminal name="10.0.0.15"/>
-      <terminal name="10.0.0.19"/>
-    </network-switch>
-    <network-switch name="rack4">
-      <terminal name="10.0.0.4"/>
-      <terminal name="10.0.0.8"/>
-      <terminal name="10.0.0.12"/>
-      <terminal name="10.0.0.16"/>
-      <terminal name="10.0.0.20"/>
-    </network-switch>
-  </network-switch>
-</cluster-topology>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index f7fe397..409abbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -20,7 +20,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <artifactId>hyracks-hdfs</artifactId>
-  <packaging>pom</packaging>
   <name>hyracks-hdfs</name>
 
   <parent>
@@ -42,9 +41,156 @@
     </license>
   </licenses>
 
-  <modules>
-    <module>hyracks-hdfs-1.x</module>
-    <module>hyracks-hdfs-2.x</module>
-    <module>hyracks-hdfs-core</module>
-  </modules>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <failOnWarning>true</failOnWarning>
+          <outputXML>true</outputXML>
+          <ignoredUnusedDeclaredDependencies>org.apache.hadoop:hadoop*::</ignoredUnusedDeclaredDependencies>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>process-test-classes</phase>
+            <goals>
+              <goal>analyze-only</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes combine.children="append">
+            <exclude>src/test/resources/data/customer.tbl</exclude>
+            <exclude>src/test/resources/expected/part-0</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-dataflow-std</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-dataflow-common</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-cc</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-test-support</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
+  </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..96b0a23
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
+        try {
+            return new TaskAttemptContextImpl(conf, tid);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+            return new TaskAttemptContextImpl(conf, tid);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContextImpl(conf, new JobID("0", 0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
new file mode 100644
index 0000000..0e2e8df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
+ *
+ * @param <K>
+ *            the key type
+ * @param <V>
+ *            the value type
+ */
+public interface IKeyValueParser<K, V> {
+
+    /**
+     * Initialize the key value parser.
+     *
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void open(IFrameWriter writer) throws HyracksDataException;
+
+    /**
+     * @param key
+     * @param value
+     * @param writer
+     * @param fileName
+     * @throws HyracksDataException
+     */
+    public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException;
+
+    /**
+     * Flush the residual tuples in the internal buffer to the writer.
+     * This method is called in the close() of HDFSReadOperatorDescriptor.
+     *
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void close(IFrameWriter writer) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
new file mode 100644
index 0000000..1cde639
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
+ *
+ * @param <K>
+ *            the key type
+ * @param <V>
+ *            the value type
+ */
+public interface IKeyValueParserFactory<K, V> extends Serializable {
+
+    /**
+     * This method creates a key-value parser.
+     *
+     * @param ctx
+     *            the IHyracksTaskContext
+     * @return a key-value parser instance.
+     */
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..fc66008
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+    public String findNearestAvailableSlot(InputSplit split);
+
+    public int numAvailableSlots();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..02c5fb3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ *
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos, Map<String, List<String>> ipToNcMapping,
+            Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads, int slotLimit);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
new file mode 100644
index 0000000..f950750
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
+ */
+public interface ITupleWriter {
+
+    /**
+     * Initialize the the tuple writer.
+     *
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void open(DataOutput output) throws HyracksDataException;
+
+    /**
+     * Write the tuple to the DataOutput.
+     *
+     * @param output
+     *            the DataOutput channel
+     * @param tuple
+     *            the tuple to write
+     * @throws HyracksDataException
+     */
+    public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Close the writer.
+     *
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void close(DataOutput output) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
new file mode 100644
index 0000000..57dadb0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
+ */
+public interface ITupleWriterFactory extends Serializable {
+
+    /**
+     * @param ctx
+     *            the IHyracksTaskContext
+     * @return a tuple writer instance
+     */
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition)
+            throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
new file mode 100644
index 0000000..33b84dc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private byte[] confBytes;
+
+    public ConfFactory(JobConf conf) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            conf.write(dos);
+            confBytes = bos.toByteArray();
+            dos.close();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public synchronized JobConf getConf() throws HyracksDataException {
+        try {
+            JobConf conf = new JobConf();
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+            conf.readFields(dis);
+            dis.close();
+            return conf;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..2aa45b7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.hdfs.api.IKeyValueParser;
+import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * The HDFS file read operator using the Hadoop old API.
+ * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * key-value pairs into tuples.
+ */
+@SuppressWarnings({ "rawtypes" })
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final ConfFactory confFactory;
+    private final InputSplitsFactory splitsFactory;
+    private final String[] scheduledLocations;
+    private final IKeyValueParserFactory tupleParserFactory;
+    private final boolean[] executed;
+
+    /**
+     * The constructor of HDFSReadOperatorDescriptor.
+     *
+     * @param spec
+     *            the JobSpecification object
+     * @param rd
+     *            the output record descriptor
+     * @param conf
+     *            the Hadoop JobConf object, which contains the input format and the input paths
+     * @param splits
+     *            the array of FileSplits (HDFS chunks).
+     * @param scheduledLocations
+     *            the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
+     *            is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+     * @param tupleParserFactory
+     *            the ITupleParserFactory implementation instance.
+     * @throws HyracksException
+     */
+    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, JobConf conf, InputSplit[] splits,
+            String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+        super(spec, 0, 1);
+        try {
+            this.splitsFactory = new InputSplitsFactory(splits);
+            this.confFactory = new ConfFactory(conf);
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
+        this.tupleParserFactory = tupleParserFactory;
+        this.outRecDescs[0] = rd;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final InputSplit[] inputSplits = splitsFactory.getSplits();
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void initialize() throws HyracksDataException {
+                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+                try {
+                    writer.open();
+                    Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
+                    JobConf conf = confFactory.getConf();
+                    conf.setClassLoader(ctx.getJobletContext().getClassLoader());
+                    IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+                    try {
+                        parser.open(writer);
+                        InputFormat inputFormat = conf.getInputFormat();
+                        for (int i = 0; i < inputSplits.length; i++) {
+                            /**
+                             * read all the partitions scheduled to the current node
+                             */
+                            if (scheduledLocations[i].equals(nodeName)) {
+                                /**
+                                 * pick an unread split to read
+                                 * synchronize among simultaneous partitions in the same machine
+                                 */
+                                synchronized (executed) {
+                                    if (executed[i] == false) {
+                                        executed[i] = true;
+                                    } else {
+                                        continue;
+                                    }
+                                }
+
+                                /**
+                                 * read the split
+                                 */
+                                RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
+                                Object key = reader.createKey();
+                                Object value = reader.createValue();
+                                while (reader.next(key, value) == true) {
+                                    parser.parse(key, value, writer, inputSplits[i].toString());
+                                }
+                            }
+                        }
+                    } finally {
+                        parser.close(writer);
+                    }
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw HyracksDataException.create(th);
+                } finally {
+                    writer.close();
+                    Thread.currentThread().setContextClassLoader(ctxCL);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..0babc5f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import org.apache.hyracks.hdfs.api.ITupleWriter;
+import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
+
+/**
+ * The HDFS file write operator using the Hadoop old API.
+ * To use this operator, a user need to provide an ITupleWriterFactory.
+ */
+@SuppressWarnings("deprecation")
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private ITupleWriterFactory tupleWriterFactory;
+
+    /**
+     * The constructor of HDFSWriteOperatorDescriptor.
+     *
+     * @param spec
+     *            the JobSpecification object
+     * @param conf
+     *            the Hadoop JobConf which contains the output path
+     * @param tupleWriterFactory
+     *            the ITupleWriterFactory implementation object
+     * @throws HyracksException
+     */
+    public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf, ITupleWriterFactory tupleWriterFactory)
+            throws HyracksException {
+        super(spec, 1, 0);
+        this.confFactory = new ConfFactory(conf);
+        this.tupleWriterFactory = tupleWriterFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+            private FSDataOutputStream dos;
+            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+            private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
+            private FrameTupleReference tuple = new FrameTupleReference();
+            private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
+
+            @Override
+            public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                JobConf conf = confFactory.getConf();
+                String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+                String fileName = outputDirPath + File.separator + "part-" + partition;
+
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
+                try {
+                    FileSystem dfs = FileSystem.get(conf);
+                    dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                accessor.reset(buffer);
+                int tupleCount = accessor.getTupleCount();
+                for (int i = 0; i < tupleCount; i++) {
+                    tuple.reset(accessor, i);
+                    tupleWriter.write(dos, tuple);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    tupleWriter.close(dos);
+                    dos.close();
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
+                }
+            }
+
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
new file mode 100644
index 0000000..4402964
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings({ "rawtypes" })
+public class InputSplitsFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private byte[] splitBytes;
+    private String splitClassName;
+
+    public InputSplitsFactory(InputSplit[] splits) throws HyracksDataException {
+        splitBytes = splitsToBytes(splits);
+        if (splits.length > 0) {
+            splitClassName = splits[0].getClass().getName();
+        } else {
+            splitClassName = FileSplit.class.getName();
+        }
+    }
+
+    public InputSplit[] getSplits() throws HyracksDataException {
+        return bytesToSplits(splitBytes);
+    }
+
+    /**
+     * Convert splits to bytes.
+     *
+     * @param splits
+     *            input splits
+     * @return bytes which serialize the splits
+     * @throws IOException
+     */
+    private byte[] splitsToBytes(InputSplit[] splits) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            dos.writeInt(splits.length);
+            for (int i = 0; i < splits.length; i++) {
+                splits[i].write(dos);
+            }
+            dos.close();
+            return bos.toByteArray();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    /**
+     * Covert bytes to splits.
+     *
+     * @param bytes
+     * @return
+     * @throws HyracksDataException
+     */
+    private InputSplit[] bytesToSplits(byte[] bytes) throws HyracksDataException {
+        try {
+            Class splitClass = Class.forName(splitClassName);
+            Constructor[] constructors = splitClass.getDeclaredConstructors();
+            Constructor defaultConstructor = null;
+            for (Constructor constructor : constructors) {
+                if (constructor.getParameterTypes().length == 0) {
+                    constructor.setAccessible(true);
+                    defaultConstructor = constructor;
+                }
+            }
+            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+            DataInputStream dis = new DataInputStream(bis);
+            int size = dis.readInt();
+            InputSplit[] splits = new InputSplit[size];
+            for (int i = 0; i < size; i++) {
+                splits[i] = (InputSplit) defaultConstructor.newInstance();
+                splits[i].readFields(dis);
+            }
+            dis.close();
+            return splits;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
new file mode 100644
index 0000000..01acddd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RawBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private static final IBinaryComparator comparator = RawBinaryComparatorFactory::compare;
+    public static final IBinaryComparatorFactory INSTANCE = new RawBinaryComparatorFactory();
+
+    private RawBinaryComparatorFactory() {
+    }
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        return comparator;
+    }
+
+    public static final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int commonLength = Math.min(l1, l2);
+        for (int i = 0; i < commonLength; i++) {
+            if (b1[s1 + i] != b2[s2 + i]) {
+                return b1[s1 + i] - b2[s2 + i];
+            }
+        }
+        int difference = l1 - l2;
+        return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..047bbec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class RawBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static IBinaryHashFunctionFactory INSTANCE = new RawBinaryHashFunctionFactory();
+
+    private RawBinaryHashFunctionFactory() {
+    }
+
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction() {
+
+        return new IBinaryHashFunction() {
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                int value = 1;
+                int end = offset + length;
+                for (int i = offset; i < end; i++)
+                    value = value * 31 + (int) bytes[i];
+                return value;
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
new file mode 100644
index 0000000..f6bf66c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.hdfs.api.IKeyValueParser;
+import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx)
+            throws HyracksDataException {
+
+        final ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+
+        return new IKeyValueParser<LongWritable, Text>() {
+
+            @Override
+            public void open(IFrameWriter writer) {
+
+            }
+
+            @Override
+            public void parse(LongWritable key, Text value, IFrameWriter writer, String fileString)
+                    throws HyracksDataException {
+                tb.reset();
+                tb.addField(value.getBytes(), 0, value.getLength());
+                FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                        tb.getSize());
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                appender.write(writer, false);
+            }
+
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
new file mode 100644
index 0000000..fd1438c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.lib;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.hdfs.api.ITupleWriter;
+import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class TextTupleWriterFactory implements ITupleWriterFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
+        return new ITupleWriter() {
+            private byte newLine = "\n".getBytes()[0];
+
+            @Override
+            public void open(DataOutput output) {
+
+            }
+
+            @Override
+            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                byte[] data = tuple.getFieldData(0);
+                int start = tuple.getFieldStart(0);
+                int len = tuple.getFieldLength(0);
+                try {
+                    output.write(data, start, len);
+                    output.writeByte(newLine);
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
+            @Override
+            public void close(DataOutput output) {
+
+            }
+
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..c53a779
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.hdfs.api.INcCollection;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+    @Override
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+            final int[] workloads, final int slotLimit) {
+        final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+        for (int i = 0; i < workloads.length; i++) {
+            if (workloads[i] < slotLimit) {
+                byte[] rawip;
+                try {
+                    rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
+                } catch (UnknownHostException e) {
+                    // QQQ Should probably have a neater solution than this
+                    throw new RuntimeException(e);
+                }
+                BytesWritable ip = new BytesWritable(rawip);
+                IntWritable availableSlot = availableIpsToSlots.get(ip);
+                if (availableSlot == null) {
+                    availableSlot = new IntWritable(slotLimit - workloads[i]);
+                    availableIpsToSlots.put(ip, availableSlot);
+                } else {
+                    availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+                }
+            }
+        }
+        return new INcCollection() {
+
+            @Override
+            public String findNearestAvailableSlot(InputSplit split) {
+                try {
+                    String[] locs = split.getLocations();
+                    int minDistance = Integer.MAX_VALUE;
+                    BytesWritable currentCandidateIp = null;
+                    if (locs == null || locs.length > 0) {
+                        for (int j = 0; j < locs.length; j++) {
+                            /**
+                             * get all the IP addresses from the name
+                             */
+                            InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                            for (InetAddress ip : allIps) {
+                                BytesWritable splitIp = new BytesWritable(ip.getAddress());
+                                /**
+                                 * if the node controller exists
+                                 */
+                                BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+                                if (candidateNcIp == null) {
+                                    candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+                                }
+                                if (candidateNcIp != null) {
+                                    if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+                                        byte[] candidateIP = candidateNcIp.getBytes();
+                                        byte[] splitIP = splitIp.getBytes();
+                                        int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+                                                | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+                                        int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+                                                | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+                                        int distance = Math.abs(candidateInt - splitInt);
+                                        if (minDistance > distance) {
+                                            minDistance = distance;
+                                            currentCandidateIp = candidateNcIp;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    } else {
+                        for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                            if (entry.getValue().get() > 0) {
+                                currentCandidateIp = entry.getKey();
+                                break;
+                            }
+                        }
+                    }
+
+                    if (currentCandidateIp != null) {
+                        /**
+                         * Update the entry of the selected IP
+                         */
+                        IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+                        availableSlot.set(availableSlot.get() - 1);
+                        if (availableSlot.get() == 0) {
+                            availableIpsToSlots.remove(currentCandidateIp);
+                        }
+                        /**
+                         * Update the entry of the selected NC
+                         */
+                        List<String> dataLocations = ipToNcMapping
+                                .get(InetAddress.getByAddress(currentCandidateIp.getBytes()).getHostAddress());
+                        for (String nc : dataLocations) {
+                            int ncIndex = ncNameToIndex.get(nc);
+                            if (workloads[ncIndex] < slotLimit) {
+                                return nc;
+                            }
+                        }
+                    }
+                    /** not scheduled */
+                    return null;
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public int numAvailableSlots() {
+                return availableIpsToSlots.size();
+            }
+
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
new file mode 100644
index 0000000..63be8c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.hdfs.api.INcCollection;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings("deprecation")
+public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private ClusterTopology topology;
+
+    public RackAwareNcCollectionBuilder(ClusterTopology topology) {
+        this.topology = topology;
+    }
+
+    @Override
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+            final int[] workloads, final int slotLimit) {
+        try {
+            final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
+            for (String NC : NCs) {
+                List<Integer> path = new ArrayList<>();
+                String ipAddress = InetAddress
+                        .getByAddress(ncNameToNcInfos.get(NC).getNetworkAddress().lookupIpAddress()).getHostAddress();
+                topology.lookupNetworkTerminal(ipAddress, path);
+                if (path.isEmpty()) {
+                    // if the hyracks nc is not in the defined cluster
+                    path.add(Integer.MIN_VALUE);
+                    LOGGER.info(NC + "'s IP address is not in the cluster toplogy file!");
+                }
+                List<String> ncs = pathToNCs.computeIfAbsent(path, k -> new ArrayList<>());
+                ncs.add(NC);
+            }
+
+            final TreeMap<List<Integer>, IntWritable> availableIpsToSlots =
+                    new TreeMap<List<Integer>, IntWritable>((l1, l2) -> {
+                        int commonLength = Math.min(l1.size(), l2.size());
+                        for (int i = 0; i < commonLength; i++) {
+                            int value1 = l1.get(i);
+                            int value2 = l2.get(i);
+                            int cmp = Integer.compare(value1, value2);
+                            if (cmp != 0) {
+                                return cmp;
+                            }
+                        }
+                        return Integer.compare(l1.size(), l2.size());
+                    });
+            for (int i = 0; i < workloads.length; i++) {
+                if (workloads[i] < slotLimit) {
+                    List<Integer> path = new ArrayList<Integer>();
+                    String ipAddress =
+                            InetAddress.getByAddress(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress())
+                                    .getHostAddress();
+                    topology.lookupNetworkTerminal(ipAddress, path);
+                    if (path.isEmpty()) {
+                        // if the hyracks nc is not in the defined cluster
+                        path.add(Integer.MIN_VALUE);
+                    }
+                    IntWritable availableSlot = availableIpsToSlots.get(path);
+                    if (availableSlot == null) {
+                        availableSlot = new IntWritable(slotLimit - workloads[i]);
+                        availableIpsToSlots.put(path, availableSlot);
+                    } else {
+                        availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+                    }
+                }
+            }
+            return new INcCollection() {
+
+                @Override
+                public String findNearestAvailableSlot(InputSplit split) {
+                    try {
+                        String[] locs = split.getLocations();
+                        int minDistance = Integer.MAX_VALUE;
+                        List<Integer> currentCandidatePath = null;
+                        if (locs == null || locs.length > 0) {
+                            for (String loc : locs) {
+                                /*
+                                 * get all the IP addresses from the name
+                                 */
+                                InetAddress[] allIps = InetAddress.getAllByName(loc);
+                                boolean inTopology = false;
+                                for (InetAddress ip : allIps) {
+                                    List<Integer> splitPath = new ArrayList<>();
+                                    boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
+                                    if (!inCluster) {
+                                        continue;
+                                    }
+                                    inTopology = true;
+                                    /*
+                                     * if the node controller exists
+                                     */
+                                    List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
+                                    if (candidatePath == null) {
+                                        candidatePath = availableIpsToSlots.ceilingKey(splitPath);
+                                    }
+                                    if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
+                                        int distance = distance(splitPath, candidatePath);
+                                        if (minDistance > distance) {
+                                            minDistance = distance;
+                                            currentCandidatePath = candidatePath;
+                                        }
+                                    }
+                                }
+
+                                if (!inTopology) {
+                                    LOGGER.info(loc + "'s IP address is not in the cluster toplogy file!");
+                                    /*
+                                     * if the machine is not in the toplogy file
+                                     */
+                                    List<Integer> candidatePath = null;
+                                    for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                        if (entry.getValue().get() > 0) {
+                                            candidatePath = entry.getKey();
+                                            break;
+                                        }
+                                    }
+                                    /* the split path is empty */
+                                    if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
+                                        currentCandidatePath = candidatePath;
+                                    }
+                                }
+                            }
+                        } else {
+                            for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                if (entry.getValue().get() > 0) {
+                                    currentCandidatePath = entry.getKey();
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (currentCandidatePath != null && !currentCandidatePath.isEmpty()) {
+                            /*
+                             * Update the entry of the selected IP
+                             */
+                            IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
+                            availableSlot.set(availableSlot.get() - 1);
+                            if (availableSlot.get() == 0) {
+                                availableIpsToSlots.remove(currentCandidatePath);
+                            }
+                            /*
+                             * Update the entry of the selected NC
+                             */
+                            List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+                            for (String candidate : candidateNcs) {
+                                int ncIndex = ncNameToIndex.get(candidate);
+                                if (workloads[ncIndex] < slotLimit) {
+                                    return candidate;
+                                }
+                            }
+                        }
+                        /* not scheduled */
+                        return null;
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+
+                @Override
+                public int numAvailableSlots() {
+                    return availableIpsToSlots.size();
+                }
+
+                private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
+                    int commonLength = Math.min(splitPath.size(), candidatePath.size());
+                    int distance = 0;
+                    for (int i = 0; i < commonLength; i++) {
+                        distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                    }
+                    List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+                    for (int i = commonLength; i < restElements.size(); i++) {
+                        distance = distance * 100 + Math.abs(restElements.get(i));
+                    }
+                    return distance;
+                }
+            };
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}