You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/06/18 17:49:35 UTC
[3/6] asterixdb git commit: [NO ISSUE] Remove obsolete support for
older HDFS versions
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
deleted file mode 100644
index 9dca49a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
-
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
-
- <property>
- <name>dfs.block.size</name>
- <value>1048576</value>
- </property>
-
- <property>
- <name>dfs.namenode.fs-limits.min-block-size</name>
- <value>1048576</value>
- </property>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
deleted file mode 100755
index 145b484..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
+++ /dev/null
@@ -1,112 +0,0 @@
-#/*
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- #*/
-# Define some default values that can be overridden by system properties
-hadoop.root.logger=FATAL,console
-hadoop.log.dir=.
-hadoop.log.file=hadoop.log
-
-# Define the root logger to the system property "hadoop.root.logger".
-log4j.rootLogger=${hadoop.root.logger}, EventCounter
-
-# Logging Threshold
-log4j.threshhold=FATAL
-
-#
-# Daily Rolling File Appender
-#
-
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
-
-# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-# Debugging Pattern format
-#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-
-#
-# console
-# Add "console" to rootlogger above if you want to use this
-#
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
-
-#
-# TaskLog Appender
-#
-
-#Default values
-hadoop.tasklog.taskid=null
-hadoop.tasklog.noKeepSplits=4
-hadoop.tasklog.totalLogFileSize=100
-hadoop.tasklog.purgeLogSplits=true
-hadoop.tasklog.logsRetainHours=12
-
-log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
-log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
-log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
-
-log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-
-#
-# Rolling File Appender
-#
-
-#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
-
-# Logfile size and and 30-day backups
-#log4j.appender.RFA.MaxFileSize=1MB
-#log4j.appender.RFA.MaxBackupIndex=30
-
-#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
-#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-#
-# FSNamesystem Audit logging
-# All audit events are logged at INFO level
-#
-log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
-
-# Custom Logging levels
-
-#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
-#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
-#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
-
-# Jets3t library
-log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
-
-#
-# Event Counter Appender
-# Sends counts of logging messages at different severity levels to Hadoop Metrics.
-#
-log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
deleted file mode 100644
index e765c65..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
-
- <property>
- <name>mapred.job.tracker</name>
- <value>localhost:29007</value>
- </property>
- <property>
- <name>mapred.tasktracker.map.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.tasktracker.reduce.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.max.split.size</name>
- <value>2048</value>
- </property>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
deleted file mode 100644
index 68ffe00..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<cluster-topology>
- <network-switch name="all">
- <network-switch name="rack1">
- <terminal name="10.0.0.1"/>
- <terminal name="10.0.0.5"/>
- <terminal name="10.0.0.9"/>
- <terminal name="10.0.0.13"/>
- <terminal name="10.0.0.17"/>
- </network-switch>
- <network-switch name="rack2">
- <terminal name="10.0.0.2"/>
- <terminal name="10.0.0.6"/>
- <terminal name="10.0.0.10"/>
- <terminal name="10.0.0.14"/>
- <terminal name="10.0.0.18"/>
- </network-switch>
- <network-switch name="rack3">
- <terminal name="10.0.0.3"/>
- <terminal name="10.0.0.7"/>
- <terminal name="10.0.0.11"/>
- <terminal name="10.0.0.15"/>
- <terminal name="10.0.0.19"/>
- </network-switch>
- <network-switch name="rack4">
- <terminal name="10.0.0.4"/>
- <terminal name="10.0.0.8"/>
- <terminal name="10.0.0.12"/>
- <terminal name="10.0.0.16"/>
- <terminal name="10.0.0.20"/>
- </network-switch>
- </network-switch>
-</cluster-topology>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index f7fe397..409abbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -20,7 +20,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hyracks-hdfs</artifactId>
- <packaging>pom</packaging>
<name>hyracks-hdfs</name>
<parent>
@@ -42,9 +41,156 @@
</license>
</licenses>
- <modules>
- <module>hyracks-hdfs-1.x</module>
- <module>hyracks-hdfs-2.x</module>
- <module>hyracks-hdfs-core</module>
- </modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <failOnWarning>true</failOnWarning>
+ <outputXML>true</outputXML>
+ <ignoredUnusedDeclaredDependencies>org.apache.hadoop:hadoop*::</ignoredUnusedDeclaredDependencies>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>analyze-only</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/data/customer.tbl</exclude>
+ <exclude>src/test/resources/expected/part-0</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..96b0a23
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+ public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
+ try {
+ return new TaskAttemptContextImpl(conf, tid);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+ try {
+ TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+ return new TaskAttemptContextImpl(conf, tid);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public JobContext createJobContext(Configuration conf) {
+ return new JobContextImpl(conf, new JobID("0", 0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
new file mode 100644
index 0000000..0e2e8df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParser.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
+ *
+ * @param <K>
+ * the key type
+ * @param <V>
+ * the value type
+ */
+public interface IKeyValueParser<K, V> {
+
+ /**
+ * Initialize the key value parser.
+ *
+ * @param writer
+ * The hyracks writer for outputting data.
+ * @throws HyracksDataException
+ */
+ public void open(IFrameWriter writer) throws HyracksDataException;
+
+ /**
+ * @param key
+ * @param value
+ * @param writer
+ * @param fileName
+ * @throws HyracksDataException
+ */
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException;
+
+ /**
+ * Flush the residual tuples in the internal buffer to the writer.
+ * This method is called in the close() of HDFSReadOperatorDescriptor.
+ *
+ * @param writer
+ * The hyracks writer for outputting data.
+ * @throws HyracksDataException
+ */
+ public void close(IFrameWriter writer) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
new file mode 100644
index 0000000..1cde639
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
+ *
+ * @param <K>
+ * the key type
+ * @param <V>
+ * the value type
+ */
+public interface IKeyValueParserFactory<K, V> extends Serializable {
+
+ /**
+ * This method creates a key-value parser.
+ *
+ * @param ctx
+ * the IHyracksTaskContext
+ * @return a key-value parser instance.
+ */
+ public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..fc66008
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+ public String findNearestAvailableSlot(InputSplit split);
+
+ public int numAvailableSlots();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..02c5fb3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ *
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos, Map<String, List<String>> ipToNcMapping,
+ Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads, int slotLimit);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
new file mode 100644
index 0000000..f950750
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
+ */
+public interface ITupleWriter {
+
+ /**
+ * Initialize the the tuple writer.
+ *
+ * @param output
+ * The channel for output data.
+ * @throws HyracksDataException
+ */
+ public void open(DataOutput output) throws HyracksDataException;
+
+ /**
+ * Write the tuple to the DataOutput.
+ *
+ * @param output
+ * the DataOutput channel
+ * @param tuple
+ * the tuple to write
+ * @throws HyracksDataException
+ */
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * Close the writer.
+ *
+ * @param output
+ * The channel for output data.
+ * @throws HyracksDataException
+ */
+ public void close(DataOutput output) throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
new file mode 100644
index 0000000..57dadb0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
+ */
+public interface ITupleWriterFactory extends Serializable {
+
+ /**
+ * @param ctx
+ * the IHyracksTaskContext
+ * @return a tuple writer instance
+ */
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition)
+ throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
new file mode 100644
index 0000000..33b84dc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private byte[] confBytes;
+
+ public ConfFactory(JobConf conf) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ conf.write(dos);
+ confBytes = bos.toByteArray();
+ dos.close();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public synchronized JobConf getConf() throws HyracksDataException {
+ try {
+ JobConf conf = new JobConf();
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+ conf.readFields(dis);
+ dis.close();
+ return conf;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..2aa45b7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.hdfs.api.IKeyValueParser;
+import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * The HDFS file read operator using the Hadoop old API.
+ * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * key-value pairs into tuples.
+ */
+@SuppressWarnings({ "rawtypes" })
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final ConfFactory confFactory;
+ private final InputSplitsFactory splitsFactory;
+ private final String[] scheduledLocations;
+ private final IKeyValueParserFactory tupleParserFactory;
+ private final boolean[] executed;
+
+ /**
+ * The constructor of HDFSReadOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param rd
+ * the output record descriptor
+ * @param conf
+ * the Hadoop JobConf object, which contains the input format and the input paths
+ * @param splits
+ * the array of FileSplits (HDFS chunks).
+ * @param scheduledLocations
+ * the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
+ * is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+ * @param tupleParserFactory
+ * the ITupleParserFactory implementation instance.
+ * @throws HyracksException
+ */
+ public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, JobConf conf, InputSplit[] splits,
+ String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+ super(spec, 0, 1);
+ try {
+ this.splitsFactory = new InputSplitsFactory(splits);
+ this.confFactory = new ConfFactory(conf);
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
+ this.tupleParserFactory = tupleParserFactory;
+ this.outRecDescs[0] = rd;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final InputSplit[] inputSplits = splitsFactory.getSplits();
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize() throws HyracksDataException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ writer.open();
+ Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
+ JobConf conf = confFactory.getConf();
+ conf.setClassLoader(ctx.getJobletContext().getClassLoader());
+ IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+ try {
+ parser.open(writer);
+ InputFormat inputFormat = conf.getInputFormat();
+ for (int i = 0; i < inputSplits.length; i++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (scheduledLocations[i].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[i] == false) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ while (reader.next(key, value) == true) {
+ parser.parse(key, value, writer, inputSplits[i].toString());
+ }
+ }
+ }
+ } finally {
+ parser.close(writer);
+ }
+ } catch (Throwable th) {
+ writer.fail();
+ throw HyracksDataException.create(th);
+ } finally {
+ writer.close();
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..0babc5f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import org.apache.hyracks.hdfs.api.ITupleWriter;
+import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
+
+/**
+ * The HDFS file write operator using the Hadoop old API.
+ * To use this operator, a user need to provide an ITupleWriterFactory.
+ */
+@SuppressWarnings("deprecation")
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private ITupleWriterFactory tupleWriterFactory;
+
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf, ITupleWriterFactory tupleWriterFactory)
+ throws HyracksException {
+ super(spec, 1, 0);
+ this.confFactory = new ConfFactory(conf);
+ this.tupleWriterFactory = tupleWriterFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ private FSDataOutputStream dos;
+ private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+ private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
+
+ @Override
+ public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ JobConf conf = confFactory.getConf();
+ String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+ String fileName = outputDirPath + File.separator + "part-" + partition;
+
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ dos = dfs.create(new Path(fileName), true);
+ tupleWriter.open(dos);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ tupleWriter.write(dos, tuple);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ tupleWriter.close(dos);
+ dos.close();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
new file mode 100644
index 0000000..4402964
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings({ "rawtypes" })
+public class InputSplitsFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private byte[] splitBytes;
+ private String splitClassName;
+
+ public InputSplitsFactory(InputSplit[] splits) throws HyracksDataException {
+ splitBytes = splitsToBytes(splits);
+ if (splits.length > 0) {
+ splitClassName = splits[0].getClass().getName();
+ } else {
+ splitClassName = FileSplit.class.getName();
+ }
+ }
+
+ public InputSplit[] getSplits() throws HyracksDataException {
+ return bytesToSplits(splitBytes);
+ }
+
+ /**
+ * Convert splits to bytes.
+ *
+ * @param splits
+ * input splits
+ * @return bytes which serialize the splits
+ * @throws IOException
+ */
+ private byte[] splitsToBytes(InputSplit[] splits) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeInt(splits.length);
+ for (int i = 0; i < splits.length; i++) {
+ splits[i].write(dos);
+ }
+ dos.close();
+ return bos.toByteArray();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ /**
+ * Covert bytes to splits.
+ *
+ * @param bytes
+ * @return
+ * @throws HyracksDataException
+ */
+ private InputSplit[] bytesToSplits(byte[] bytes) throws HyracksDataException {
+ try {
+ Class splitClass = Class.forName(splitClassName);
+ Constructor[] constructors = splitClass.getDeclaredConstructors();
+ Constructor defaultConstructor = null;
+ for (Constructor constructor : constructors) {
+ if (constructor.getParameterTypes().length == 0) {
+ constructor.setAccessible(true);
+ defaultConstructor = constructor;
+ }
+ }
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis);
+ int size = dis.readInt();
+ InputSplit[] splits = new InputSplit[size];
+ for (int i = 0; i < size; i++) {
+ splits[i] = (InputSplit) defaultConstructor.newInstance();
+ splits[i].readFields(dis);
+ }
+ dis.close();
+ return splits;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
new file mode 100644
index 0000000..01acddd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RawBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private static final IBinaryComparator comparator = RawBinaryComparatorFactory::compare;
+ public static final IBinaryComparatorFactory INSTANCE = new RawBinaryComparatorFactory();
+
+ private RawBinaryComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return comparator;
+ }
+
+ public static final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int commonLength = Math.min(l1, l2);
+ for (int i = 0; i < commonLength; i++) {
+ if (b1[s1 + i] != b2[s2 + i]) {
+ return b1[s1 + i] - b2[s2 + i];
+ }
+ }
+ int difference = l1 - l2;
+ return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..047bbec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class RawBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static IBinaryHashFunctionFactory INSTANCE = new RawBinaryHashFunctionFactory();
+
+ private RawBinaryHashFunctionFactory() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int value = 1;
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ value = value * 31 + (int) bytes[i];
+ return value;
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
new file mode 100644
index 0000000..f6bf66c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.lib;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.hdfs.api.IKeyValueParser;
+import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx)
+ throws HyracksDataException {
+
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+
+ return new IKeyValueParser<LongWritable, Text>() {
+
+ @Override
+ public void open(IFrameWriter writer) {
+
+ }
+
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer, String fileString)
+ throws HyracksDataException {
+ tb.reset();
+ tb.addField(value.getBytes(), 0, value.getLength());
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+ tb.getSize());
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ appender.write(writer, false);
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
new file mode 100644
index 0000000..fd1438c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.lib;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.hdfs.api.ITupleWriter;
+import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class TextTupleWriterFactory implements ITupleWriterFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
+ return new ITupleWriter() {
+ private byte newLine = "\n".getBytes()[0];
+
+ @Override
+ public void open(DataOutput output) {
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ byte[] data = tuple.getFieldData(0);
+ int start = tuple.getFieldStart(0);
+ int len = tuple.getFieldLength(0);
+ try {
+ output.write(data, start, len);
+ output.writeByte(newLine);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) {
+
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..c53a779
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.hdfs.api.INcCollection;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+ @Override
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+ final int[] workloads, final int slotLimit) {
+ final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slotLimit) {
+ byte[] rawip;
+ try {
+ rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
+ } catch (UnknownHostException e) {
+ // QQQ Should probably have a neater solution than this
+ throw new RuntimeException(e);
+ }
+ BytesWritable ip = new BytesWritable(rawip);
+ IntWritable availableSlot = availableIpsToSlots.get(ip);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slotLimit - workloads[i]);
+ availableIpsToSlots.put(ip, availableSlot);
+ } else {
+ availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+ }
+ }
+ }
+ return new INcCollection() {
+
+ @Override
+ public String findNearestAvailableSlot(InputSplit split) {
+ try {
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ BytesWritable currentCandidateIp = null;
+ if (locs == null || locs.length > 0) {
+ for (int j = 0; j < locs.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ for (InetAddress ip : allIps) {
+ BytesWritable splitIp = new BytesWritable(ip.getAddress());
+ /**
+ * if the node controller exists
+ */
+ BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+ if (candidateNcIp == null) {
+ candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+ }
+ if (candidateNcIp != null) {
+ if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+ byte[] candidateIP = candidateNcIp.getBytes();
+ byte[] splitIP = splitIp.getBytes();
+ int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+ | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+ int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+ | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+ int distance = Math.abs(candidateInt - splitInt);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidateIp = candidateNcIp;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidateIp = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (currentCandidateIp != null) {
+ /**
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidateIp);
+ }
+ /**
+ * Update the entry of the selected NC
+ */
+ List<String> dataLocations = ipToNcMapping
+ .get(InetAddress.getByAddress(currentCandidateIp.getBytes()).getHostAddress());
+ for (String nc : dataLocations) {
+ int ncIndex = ncNameToIndex.get(nc);
+ if (workloads[ncIndex] < slotLimit) {
+ return nc;
+ }
+ }
+ }
+ /** not scheduled */
+ return null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int numAvailableSlots() {
+ return availableIpsToSlots.size();
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
new file mode 100644
index 0000000..63be8c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.hdfs.api.INcCollection;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings("deprecation")
+public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private ClusterTopology topology;
+
+ public RackAwareNcCollectionBuilder(ClusterTopology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+ final int[] workloads, final int slotLimit) {
+ try {
+ final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
+ for (String NC : NCs) {
+ List<Integer> path = new ArrayList<>();
+ String ipAddress = InetAddress
+ .getByAddress(ncNameToNcInfos.get(NC).getNetworkAddress().lookupIpAddress()).getHostAddress();
+ topology.lookupNetworkTerminal(ipAddress, path);
+ if (path.isEmpty()) {
+ // if the hyracks nc is not in the defined cluster
+ path.add(Integer.MIN_VALUE);
+ LOGGER.info(NC + "'s IP address is not in the cluster toplogy file!");
+ }
+ List<String> ncs = pathToNCs.computeIfAbsent(path, k -> new ArrayList<>());
+ ncs.add(NC);
+ }
+
+ final TreeMap<List<Integer>, IntWritable> availableIpsToSlots =
+ new TreeMap<List<Integer>, IntWritable>((l1, l2) -> {
+ int commonLength = Math.min(l1.size(), l2.size());
+ for (int i = 0; i < commonLength; i++) {
+ int value1 = l1.get(i);
+ int value2 = l2.get(i);
+ int cmp = Integer.compare(value1, value2);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return Integer.compare(l1.size(), l2.size());
+ });
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slotLimit) {
+ List<Integer> path = new ArrayList<Integer>();
+ String ipAddress =
+ InetAddress.getByAddress(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress())
+ .getHostAddress();
+ topology.lookupNetworkTerminal(ipAddress, path);
+ if (path.isEmpty()) {
+ // if the hyracks nc is not in the defined cluster
+ path.add(Integer.MIN_VALUE);
+ }
+ IntWritable availableSlot = availableIpsToSlots.get(path);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slotLimit - workloads[i]);
+ availableIpsToSlots.put(path, availableSlot);
+ } else {
+ availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+ }
+ }
+ }
+ return new INcCollection() {
+
+ @Override
+ public String findNearestAvailableSlot(InputSplit split) {
+ try {
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ List<Integer> currentCandidatePath = null;
+ if (locs == null || locs.length > 0) {
+ for (String loc : locs) {
+ /*
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(loc);
+ boolean inTopology = false;
+ for (InetAddress ip : allIps) {
+ List<Integer> splitPath = new ArrayList<>();
+ boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
+ if (!inCluster) {
+ continue;
+ }
+ inTopology = true;
+ /*
+ * if the node controller exists
+ */
+ List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
+ if (candidatePath == null) {
+ candidatePath = availableIpsToSlots.ceilingKey(splitPath);
+ }
+ if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
+ int distance = distance(splitPath, candidatePath);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidatePath = candidatePath;
+ }
+ }
+ }
+
+ if (!inTopology) {
+ LOGGER.info(loc + "'s IP address is not in the cluster toplogy file!");
+ /*
+ * if the machine is not in the toplogy file
+ */
+ List<Integer> candidatePath = null;
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ candidatePath = entry.getKey();
+ break;
+ }
+ }
+ /* the split path is empty */
+ if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
+ currentCandidatePath = candidatePath;
+ }
+ }
+ }
+ } else {
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidatePath = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (currentCandidatePath != null && !currentCandidatePath.isEmpty()) {
+ /*
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidatePath);
+ }
+ /*
+ * Update the entry of the selected NC
+ */
+ List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+ for (String candidate : candidateNcs) {
+ int ncIndex = ncNameToIndex.get(candidate);
+ if (workloads[ncIndex] < slotLimit) {
+ return candidate;
+ }
+ }
+ }
+ /* not scheduled */
+ return null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int numAvailableSlots() {
+ return availableIpsToSlots.size();
+ }
+
+ private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
+ int commonLength = Math.min(splitPath.size(), candidatePath.size());
+ int distance = 0;
+ for (int i = 0; i < commonLength; i++) {
+ distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ }
+ List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+ for (int i = commonLength; i < restElements.size(); i++) {
+ distance = distance * 100 + Math.abs(restElements.get(i));
+ }
+ return distance;
+ }
+ };
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}