You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/06/18 17:49:37 UTC

[5/6] asterixdb git commit: [NO ISSUE] Remove obsolete support for older HDFS versions

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
deleted file mode 100644
index fd1438c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.lib;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.hdfs.api.ITupleWriter;
-import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class TextTupleWriterFactory implements ITupleWriterFactory {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
-        return new ITupleWriter() {
-            private byte newLine = "\n".getBytes()[0];
-
-            @Override
-            public void open(DataOutput output) {
-
-            }
-
-            @Override
-            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
-                byte[] data = tuple.getFieldData(0);
-                int start = tuple.getFieldStart(0);
-                int len = tuple.getFieldLength(0);
-                try {
-                    output.write(data, start, len);
-                    output.writeByte(newLine);
-                } catch (Exception e) {
-                    throw HyracksDataException.create(e);
-                }
-            }
-
-            @Override
-            public void close(DataOutput output) {
-
-            }
-
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
deleted file mode 100644
index c53a779..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-
-@SuppressWarnings("deprecation")
-public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
-
-    @Override
-    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
-            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
-            final int[] workloads, final int slotLimit) {
-        final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
-        for (int i = 0; i < workloads.length; i++) {
-            if (workloads[i] < slotLimit) {
-                byte[] rawip;
-                try {
-                    rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
-                } catch (UnknownHostException e) {
-                    // QQQ Should probably have a neater solution than this
-                    throw new RuntimeException(e);
-                }
-                BytesWritable ip = new BytesWritable(rawip);
-                IntWritable availableSlot = availableIpsToSlots.get(ip);
-                if (availableSlot == null) {
-                    availableSlot = new IntWritable(slotLimit - workloads[i]);
-                    availableIpsToSlots.put(ip, availableSlot);
-                } else {
-                    availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
-                }
-            }
-        }
-        return new INcCollection() {
-
-            @Override
-            public String findNearestAvailableSlot(InputSplit split) {
-                try {
-                    String[] locs = split.getLocations();
-                    int minDistance = Integer.MAX_VALUE;
-                    BytesWritable currentCandidateIp = null;
-                    if (locs == null || locs.length > 0) {
-                        for (int j = 0; j < locs.length; j++) {
-                            /**
-                             * get all the IP addresses from the name
-                             */
-                            InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
-                            for (InetAddress ip : allIps) {
-                                BytesWritable splitIp = new BytesWritable(ip.getAddress());
-                                /**
-                                 * if the node controller exists
-                                 */
-                                BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
-                                if (candidateNcIp == null) {
-                                    candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
-                                }
-                                if (candidateNcIp != null) {
-                                    if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
-                                        byte[] candidateIP = candidateNcIp.getBytes();
-                                        byte[] splitIP = splitIp.getBytes();
-                                        int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
-                                                | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
-                                        int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
-                                                | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
-                                        int distance = Math.abs(candidateInt - splitInt);
-                                        if (minDistance > distance) {
-                                            minDistance = distance;
-                                            currentCandidateIp = candidateNcIp;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    } else {
-                        for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
-                            if (entry.getValue().get() > 0) {
-                                currentCandidateIp = entry.getKey();
-                                break;
-                            }
-                        }
-                    }
-
-                    if (currentCandidateIp != null) {
-                        /**
-                         * Update the entry of the selected IP
-                         */
-                        IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
-                        availableSlot.set(availableSlot.get() - 1);
-                        if (availableSlot.get() == 0) {
-                            availableIpsToSlots.remove(currentCandidateIp);
-                        }
-                        /**
-                         * Update the entry of the selected NC
-                         */
-                        List<String> dataLocations = ipToNcMapping
-                                .get(InetAddress.getByAddress(currentCandidateIp.getBytes()).getHostAddress());
-                        for (String nc : dataLocations) {
-                            int ncIndex = ncNameToIndex.get(nc);
-                            if (workloads[ncIndex] < slotLimit) {
-                                return nc;
-                            }
-                        }
-                    }
-                    /** not scheduled */
-                    return null;
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-
-            @Override
-            public int numAvailableSlots() {
-                return availableIpsToSlots.size();
-            }
-
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
deleted file mode 100644
index 63be8c5..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-@SuppressWarnings("deprecation")
-public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
-    private static final Logger LOGGER = LogManager.getLogger();
-    private ClusterTopology topology;
-
-    public RackAwareNcCollectionBuilder(ClusterTopology topology) {
-        this.topology = topology;
-    }
-
-    @Override
-    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
-            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
-            final int[] workloads, final int slotLimit) {
-        try {
-            final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
-            for (String NC : NCs) {
-                List<Integer> path = new ArrayList<>();
-                String ipAddress = InetAddress
-                        .getByAddress(ncNameToNcInfos.get(NC).getNetworkAddress().lookupIpAddress()).getHostAddress();
-                topology.lookupNetworkTerminal(ipAddress, path);
-                if (path.isEmpty()) {
-                    // if the hyracks nc is not in the defined cluster
-                    path.add(Integer.MIN_VALUE);
-                    LOGGER.info(NC + "'s IP address is not in the cluster toplogy file!");
-                }
-                List<String> ncs = pathToNCs.computeIfAbsent(path, k -> new ArrayList<>());
-                ncs.add(NC);
-            }
-
-            final TreeMap<List<Integer>, IntWritable> availableIpsToSlots =
-                    new TreeMap<List<Integer>, IntWritable>((l1, l2) -> {
-                        int commonLength = Math.min(l1.size(), l2.size());
-                        for (int i = 0; i < commonLength; i++) {
-                            int value1 = l1.get(i);
-                            int value2 = l2.get(i);
-                            int cmp = Integer.compare(value1, value2);
-                            if (cmp != 0) {
-                                return cmp;
-                            }
-                        }
-                        return Integer.compare(l1.size(), l2.size());
-                    });
-            for (int i = 0; i < workloads.length; i++) {
-                if (workloads[i] < slotLimit) {
-                    List<Integer> path = new ArrayList<Integer>();
-                    String ipAddress =
-                            InetAddress.getByAddress(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress())
-                                    .getHostAddress();
-                    topology.lookupNetworkTerminal(ipAddress, path);
-                    if (path.isEmpty()) {
-                        // if the hyracks nc is not in the defined cluster
-                        path.add(Integer.MIN_VALUE);
-                    }
-                    IntWritable availableSlot = availableIpsToSlots.get(path);
-                    if (availableSlot == null) {
-                        availableSlot = new IntWritable(slotLimit - workloads[i]);
-                        availableIpsToSlots.put(path, availableSlot);
-                    } else {
-                        availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
-                    }
-                }
-            }
-            return new INcCollection() {
-
-                @Override
-                public String findNearestAvailableSlot(InputSplit split) {
-                    try {
-                        String[] locs = split.getLocations();
-                        int minDistance = Integer.MAX_VALUE;
-                        List<Integer> currentCandidatePath = null;
-                        if (locs == null || locs.length > 0) {
-                            for (String loc : locs) {
-                                /*
-                                 * get all the IP addresses from the name
-                                 */
-                                InetAddress[] allIps = InetAddress.getAllByName(loc);
-                                boolean inTopology = false;
-                                for (InetAddress ip : allIps) {
-                                    List<Integer> splitPath = new ArrayList<>();
-                                    boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
-                                    if (!inCluster) {
-                                        continue;
-                                    }
-                                    inTopology = true;
-                                    /*
-                                     * if the node controller exists
-                                     */
-                                    List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
-                                    if (candidatePath == null) {
-                                        candidatePath = availableIpsToSlots.ceilingKey(splitPath);
-                                    }
-                                    if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
-                                        int distance = distance(splitPath, candidatePath);
-                                        if (minDistance > distance) {
-                                            minDistance = distance;
-                                            currentCandidatePath = candidatePath;
-                                        }
-                                    }
-                                }
-
-                                if (!inTopology) {
-                                    LOGGER.info(loc + "'s IP address is not in the cluster toplogy file!");
-                                    /*
-                                     * if the machine is not in the toplogy file
-                                     */
-                                    List<Integer> candidatePath = null;
-                                    for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
-                                        if (entry.getValue().get() > 0) {
-                                            candidatePath = entry.getKey();
-                                            break;
-                                        }
-                                    }
-                                    /* the split path is empty */
-                                    if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
-                                        currentCandidatePath = candidatePath;
-                                    }
-                                }
-                            }
-                        } else {
-                            for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
-                                if (entry.getValue().get() > 0) {
-                                    currentCandidatePath = entry.getKey();
-                                    break;
-                                }
-                            }
-                        }
-
-                        if (currentCandidatePath != null && !currentCandidatePath.isEmpty()) {
-                            /*
-                             * Update the entry of the selected IP
-                             */
-                            IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
-                            availableSlot.set(availableSlot.get() - 1);
-                            if (availableSlot.get() == 0) {
-                                availableIpsToSlots.remove(currentCandidatePath);
-                            }
-                            /*
-                             * Update the entry of the selected NC
-                             */
-                            List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
-                            for (String candidate : candidateNcs) {
-                                int ncIndex = ncNameToIndex.get(candidate);
-                                if (workloads[ncIndex] < slotLimit) {
-                                    return candidate;
-                                }
-                            }
-                        }
-                        /* not scheduled */
-                        return null;
-                    } catch (Exception e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-
-                @Override
-                public int numAvailableSlots() {
-                    return availableIpsToSlots.size();
-                }
-
-                private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
-                    int commonLength = Math.min(splitPath.size(), candidatePath.size());
-                    int distance = 0;
-                    for (int i = 0; i < commonLength; i++) {
-                        distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
-                    }
-                    List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
-                    for (int i = commonLength; i < restElements.size(); i++) {
-                        distance = distance * 100 + Math.abs(restElements.get(i));
-                    }
-                    return distance;
-                }
-            };
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
deleted file mode 100644
index 9633fb1..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Random;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * The scheduler conduct data-local scheduling for data reading on HDFS. This
- * class works for Hadoop old API.
- */
-public class Scheduler {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    /** a list of NCs */
-    private String[] NCs;
-
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
-
-    /** a map from NC name to the NodeControllerInfo */
-    private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
-    /**
-     * the nc collection builder
-     */
-    private INcCollectionBuilder ncCollectionBuilder;
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-
-    public Scheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
-            ClusterTopology topology = hcc.getClusterTopology();
-            this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
-                    : new RackAwareNcCollectionBuilder(topology);
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
-            this.ncCollectionBuilder = ncCollectionBuilder;
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        this.ncNameToNcInfos = ncNameToNcInfos;
-        this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
-        loadIPAddressToNCMap(ncNameToNcInfos);
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @param topology
-     *            the hyracks cluster toplogy
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
-            throws HyracksException {
-        this(ncNameToNcInfos);
-        this.ncCollectionBuilder =
-                topology == null ? new IPProximityNcCollectionBuilder() : new RackAwareNcCollectionBuilder(topology);
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
-            throws HyracksException {
-        this.ncNameToNcInfos = ncNameToNcInfos;
-        this.ncCollectionBuilder = ncCollectionBuilder;
-        loadIPAddressToNCMap(ncNameToNcInfos);
-    }
-
-    /**
-     * Set location constraints for a file scan operator with a list of file
-     * splits. It guarantees the maximum slots a machine can is at most one more
-     * than the minimum slots a machine can get.
-     *
-     * @throws HyracksDataException
-     */
-    public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
-        if (splits == null) {
-            /** deal the case when the splits array is null */
-            return new String[] {};
-        }
-        int[] workloads = new int[NCs.length];
-        Arrays.fill(workloads, 0);
-        String[] locations = new String[splits.length];
-        Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
-        /**
-         * upper bound number of slots that a machine can get
-         */
-        int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
-                : (splits.length / workloads.length + 1);
-        /**
-         * lower bound number of slots that a machine can get
-         */
-        int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
-
-        try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.length];
-            Arrays.fill(scheduled, false);
-            /**
-             * scan the splits and build the popularity map
-             * give the machines with less local splits more scheduling priority
-             */
-            buildPopularityMap(splits, locationToNumOfSplits);
-            /**
-             * push data-local lower-bounds slots to each machine
-             */
-            scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
-            /**
-             * push data-local upper-bounds slots to each machine
-             */
-            scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
-
-            int dataLocalCount = 0;
-            for (int i = 0; i < scheduled.length; i++) {
-                if (scheduled[i] == true) {
-                    dataLocalCount++;
-                }
-            }
-            LOGGER.info("Data local rate: "
-                    + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
-            /**
-             * push non-data-local lower-bounds slots to each machine
-             */
-            scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
-            /**
-             * push non-data-local upper-bounds slots to each machine
-             */
-            scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
-            return locations;
-        } catch (IOException e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    /**
-     * Schedule non-local slots to each machine
-     *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slotLimit
-     *            The maximum slots of each machine.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
-     */
-    private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
-            boolean[] scheduled) throws IOException, UnknownHostException {
-        /**
-         * build the map from available ips to the number of available slots
-         */
-        INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
-                workloads, slotLimit);
-        if (ncCollection.numAvailableSlots() == 0) {
-            return;
-        }
-        /**
-         * schedule no-local file reads
-         */
-        for (int i = 0; i < splits.length; i++) {
-            /** if there is no data-local NC choice, choose a random one */
-            if (!scheduled[i]) {
-                InputSplit split = splits[i];
-                String selectedNcName = ncCollection.findNearestAvailableSlot(split);
-                if (selectedNcName != null) {
-                    int ncIndex = ncNameToIndex.get(selectedNcName);
-                    workloads[ncIndex]++;
-                    scheduled[i] = true;
-                    locations[i] = selectedNcName;
-                }
-            }
-        }
-    }
-
-    /**
-     * Schedule data-local slots to each machine.
-     *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slots
-     *            The maximum slots of each machine.
-     * @param random
-     *            The random generator.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
-     * @throws IOException
-     * @throws UnknownHostException
-     */
-    private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
-            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
-            throws IOException, UnknownHostException {
-        /** scheduling candidates will be ordered inversely according to their popularity */
-        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
-
-            @Override
-            public int compare(String s1, String s2) {
-                return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
-            }
-
-        });
-        for (int i = 0; i < splits.length; i++) {
-            if (scheduled[i]) {
-                continue;
-            }
-            /**
-             * get the location of all the splits
-             */
-            String[] locs = splits[i].getLocations();
-            if (locs.length > 0) {
-                scheduleCadndiates.clear();
-                for (int j = 0; j < locs.length; j++) {
-                    scheduleCadndiates.add(locs[j]);
-                }
-
-                for (String candidate : scheduleCadndiates) {
-                    /**
-                     * get all the IP addresses from the name
-                     */
-                    InetAddress[] allIps = InetAddress.getAllByName(candidate);
-                    /**
-                     * iterate overa all ips
-                     */
-                    for (InetAddress ip : allIps) {
-                        /**
-                         * if the node controller exists
-                         */
-                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                            /**
-                             * set the ncs
-                             */
-                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                            int arrayPos = random.nextInt(dataLocations.size());
-                            String nc = dataLocations.get(arrayPos);
-                            int pos = ncNameToIndex.get(nc);
-                            /**
-                             * check if the node is already full
-                             */
-                            if (workloads[pos] < slots) {
-                                locations[i] = nc;
-                                workloads[pos]++;
-                                scheduled[i] = true;
-                                break;
-                            }
-                        }
-                    }
-                    /**
-                     * break the loop for data-locations if the schedule has
-                     * already been found
-                     */
-                    if (scheduled[i] == true) {
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Scan the splits once and build a popularity map
-     *
-     * @param splits
-     *            the split array
-     * @param locationToNumOfSplits
-     *            the map to be built
-     * @throws IOException
-     */
-    private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
-            throws IOException {
-        for (InputSplit split : splits) {
-            String[] locations = split.getLocations();
-            for (String loc : locations) {
-                IntWritable locCount = locationToNumOfSplits.get(loc);
-                if (locCount == null) {
-                    locCount = new IntWritable(0);
-                    locationToNumOfSplits.put(loc, locCount);
-                }
-                locCount.set(locCount.get() + 1);
-            }
-        }
-    }
-
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            ipToNcMapping.clear();
-            ncNameToIndex.clear();
-            int i = 0;
-
-            /*
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.computeIfAbsent(ipAddr, k -> new ArrayList<>());
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            /*
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
deleted file mode 100644
index b02a97b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.Serializable;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ConfFactory implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private byte[] confBytes;
-
-    public ConfFactory(Job conf) throws HyracksDataException {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream(bos);
-            conf.getConfiguration().write(dos);
-            confBytes = bos.toByteArray();
-            dos.close();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    public Job getConf() throws HyracksDataException {
-        try {
-            Job conf = new Job();
-            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
-            conf.getConfiguration().readFields(dis);
-            dis.close();
-            return conf;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
deleted file mode 100644
index 34b4c3e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-@SuppressWarnings("rawtypes")
-public class FileSplitsFactory implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private byte[] splitBytes;
-    private String splitClassName;
-
-    public FileSplitsFactory(List<FileSplit> splits) throws HyracksDataException {
-        splitBytes = splitsToBytes(splits);
-        if (splits.size() > 0) {
-            splitClassName = splits.get(0).getClass().getName();
-        }
-    }
-
-    public List<FileSplit> getSplits() throws HyracksDataException {
-        return bytesToSplits(splitBytes);
-    }
-
-    /**
-     * Convert splits to bytes.
-     *
-     * @param splits
-     *            input splits
-     * @return bytes which serialize the splits
-     * @throws IOException
-     */
-    private byte[] splitsToBytes(List<FileSplit> splits) throws HyracksDataException {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream(bos);
-            dos.writeInt(splits.size());
-            int size = splits.size();
-            for (int i = 0; i < size; i++) {
-                splits.get(i).write(dos);
-            }
-            dos.close();
-            return bos.toByteArray();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    /**
-     * Covert bytes to splits.
-     *
-     * @param bytes
-     * @return
-     * @throws HyracksDataException
-     */
-    private List<FileSplit> bytesToSplits(byte[] bytes) throws HyracksDataException {
-        try {
-            Class splitClass = Class.forName(splitClassName);
-            Constructor[] constructors = splitClass.getDeclaredConstructors();
-            Constructor defaultConstructor = null;
-            for (Constructor constructor : constructors) {
-                if (constructor.getParameterTypes().length == 0) {
-                    constructor.setAccessible(true);
-                    defaultConstructor = constructor;
-                }
-            }
-            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-            DataInputStream dis = new DataInputStream(bis);
-            int size = dis.readInt();
-            List<FileSplit> splits = new ArrayList<>();
-            for (int i = 0; i < size; i++) {
-                splits.add((FileSplit) defaultConstructor.newInstance());
-                splits.get(i).readFields(dis);
-            }
-            dis.close();
-            return splits;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
deleted file mode 100644
index 5adec78..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-import org.apache.hyracks.hdfs.api.IKeyValueParser;
-import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
-
-/**
- * The HDFS file read operator using the Hadoop new API. To use this operator, a
- * user need to provide an IKeyValueParserFactory implementation which convert
- * key-value pairs into tuples.
- */
-@SuppressWarnings("rawtypes")
-public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-    private final ConfFactory confFactory;
-    private final FileSplitsFactory splitsFactory;
-    private final String[] scheduledLocations;
-    private final IKeyValueParserFactory tupleParserFactory;
-    private final boolean[] executed;
-
-    /**
-     * The constructor of HDFSReadOperatorDescriptor.
-     *
-     * @param spec
-     *            the JobSpecification object
-     * @param rd
-     *            the output record descriptor
-     * @param conf
-     *            the Hadoop JobConf object, which contains the input format and
-     *            the input paths
-     * @param splits
-     *            the array of FileSplits (HDFS chunks).
-     * @param scheduledLocations
-     *            the node controller names to scan the FileSplits, which is an
-     *            one-to-one mapping. The String array is obtained from the
-     *            edu.cui
-     *            .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
-     *            (InputSplits[]).
-     * @param tupleParserFactory
-     *            the ITupleParserFactory implementation instance.
-     * @throws HyracksException
-     */
-    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, List<InputSplit> splits,
-            String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
-        super(spec, 0, 1);
-        try {
-            List<FileSplit> fileSplits = new ArrayList<FileSplit>();
-            for (int i = 0; i < splits.size(); i++) {
-                fileSplits.add((FileSplit) splits.get(i));
-            }
-            this.splitsFactory = new FileSplitsFactory(fileSplits);
-            this.confFactory = new ConfFactory(conf);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-        this.scheduledLocations = scheduledLocations;
-        this.executed = new boolean[scheduledLocations.length];
-        Arrays.fill(executed, false);
-        this.tupleParserFactory = tupleParserFactory;
-        this.outRecDescs[0] = rd;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
-        final List<FileSplit> inputSplits = splitsFactory.getSplits();
-
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
-            private ContextFactory ctxFactory = new ContextFactory();
-
-            @SuppressWarnings("unchecked")
-            @Override
-            public void initialize() throws HyracksDataException {
-                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-                try {
-                    writer.open();
-                    Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
-                    Job job = confFactory.getConf();
-                    job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-                    IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
-                    InputFormat inputFormat =
-                            ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-                    int size = inputSplits.size();
-                    for (int i = 0; i < size; i++) {
-                        /**
-                         * read all the partitions scheduled to the current node
-                         */
-                        if (scheduledLocations[i].equals(nodeName)) {
-                            /**
-                             * pick an unread split to read synchronize among
-                             * simultaneous partitions in the same machine
-                             */
-                            synchronized (executed) {
-                                if (executed[i] == false) {
-                                    executed[i] = true;
-                                } else {
-                                    continue;
-                                }
-                            }
-
-                            /**
-                             * read the split
-                             */
-                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
-                            context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-                            RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
-                            reader.initialize(inputSplits.get(i), context);
-                            while (reader.nextKeyValue() == true) {
-                                parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
-                                        inputSplits.get(i).toString());
-                            }
-                        }
-                    }
-                    parser.close(writer);
-                } catch (Throwable th) {
-                    writer.fail();
-                    throw HyracksDataException.create(th);
-                } finally {
-                    writer.close();
-                    Thread.currentThread().setContextClassLoader(ctxCL);
-                }
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
deleted file mode 100644
index c691896..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.hdfs.api.ITupleWriter;
-import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
-
-/**
- * The HDFS file write operator using the Hadoop new API. To use this operator,
- * a user need to provide an ITupleWriterFactory.
- */
-public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-    private ConfFactory confFactory;
-    private ITupleWriterFactory tupleWriterFactory;
-
-    /**
-     * The constructor of HDFSWriteOperatorDescriptor.
-     *
-     * @param spec
-     *            the JobSpecification object
-     * @param conf
-     *            the Hadoop JobConf which contains the output path
-     * @param tupleWriterFactory
-     *            the ITupleWriterFactory implementation object
-     * @throws HyracksException
-     */
-    public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, ITupleWriterFactory tupleWriterFactory)
-            throws HyracksException {
-        super(spec, 1, 0);
-        this.confFactory = new ConfFactory(conf);
-        this.tupleWriterFactory = tupleWriterFactory;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
-
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-
-            private FSDataOutputStream dos;
-            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
-            private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
-            private FrameTupleReference tuple = new FrameTupleReference();
-            private ITupleWriter tupleWriter;
-            private ClassLoader ctxCL;
-
-            @Override
-            public void open() throws HyracksDataException {
-                ctxCL = Thread.currentThread().getContextClassLoader();
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                Job conf = confFactory.getConf();
-                String outputPath = FileOutputFormat.getOutputPath(conf).toString();
-                String fileName = outputPath + File.separator + "part-" + partition;
-
-                tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
-                try {
-                    FileSystem dfs = FileSystem.get(conf.getConfiguration());
-                    dos = dfs.create(new Path(fileName), true);
-                    tupleWriter.open(dos);
-                } catch (Exception e) {
-                    throw HyracksDataException.create(e);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                accessor.reset(buffer);
-                int tupleCount = accessor.getTupleCount();
-                for (int i = 0; i < tupleCount; i++) {
-                    tuple.reset(accessor, i);
-                    tupleWriter.write(dos, tuple);
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                try {
-                    tupleWriter.close(dos);
-                    dos.close();
-                } catch (Exception e) {
-                    throw HyracksDataException.create(e);
-                } finally {
-                    Thread.currentThread().setContextClassLoader(ctxCL);
-                }
-            }
-
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
deleted file mode 100644
index f5b2384..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.scheduler;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-
-/**
- * The scheduler conduct data-local scheduling for data reading on HDFS.
- * This class works for Hadoop new API.
- */
-@SuppressWarnings("deprecation")
-public class Scheduler {
-
-    private org.apache.hyracks.hdfs.scheduler.Scheduler scheduler;
-
-    /**
-     * The constructor of the scheduler
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    public Scheduler(String ipAddress, int port) throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @param topology
-     *            the hyracks cluster toplogy
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
-            throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @throws HyracksException
-     */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
-            throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
-    }
-
-    /**
-     * Set location constraints for a file scan operator with a list of file splits
-     *
-     * @throws HyracksDataException
-     */
-    public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
-        try {
-            org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
-            for (int i = 0; i < inputSplits.length; i++)
-                inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
-            return scheduler.getLocationConstraints(inputSplits);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
deleted file mode 100644
index 6804f31..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs2.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.InputSplit;
-
-/**
- * The wrapped implementation of InputSplit, for the new API scheduler
- * to reuse the old API scheduler
- */
-@SuppressWarnings("deprecation")
-public class WrappedFileSplit implements InputSplit {
-
-    private String[] locations;
-    private long length;
-
-    public WrappedFileSplit(String[] locations, long length) {
-        this.locations = locations;
-        this.length = length;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        int len = input.readInt();
-        locations = new String[len];
-        for (int i = 0; i < len; i++)
-            locations[i] = input.readUTF();
-        length = input.readLong();
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.write(locations.length);
-        for (int i = 0; i < locations.length; i++)
-            output.writeUTF(locations[i]);
-        output.writeLong(length);
-    }
-
-    @Override
-    public long getLength() throws IOException {
-        return length;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException {
-        return locations;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
deleted file mode 100644
index 8f96bab..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.dataflow;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import org.apache.hyracks.hdfs.lib.RawBinaryComparatorFactory;
-import org.apache.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
-import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
-import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
-import org.apache.hyracks.hdfs.utils.HyracksUtils;
-import org.apache.hyracks.test.support.TestUtils;
-import org.apache.hyracks.util.file.FileUtil;
-import org.junit.Assert;
-
-import junit.framework.TestCase;
-
-/**
- * Test the org.apache.hyracks.hdfs.dataflow package,
- * the operators for the Hadoop old API.
- */
-@SuppressWarnings({ "deprecation" })
-public class DataflowTest extends TestCase {
-
-    protected static final String ACTUAL_RESULT_DIR = FileUtil.joinPath("target", "actual");
-    private static final String TEST_RESOURCES = FileUtil.joinPath("src", "test", "resources");
-    protected static final String EXPECTED_RESULT_PATH = FileUtil.joinPath(TEST_RESOURCES, "expected");
-    private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
-    protected static final String BUILD_DIR = FileUtil.joinPath("target", "build");
-
-    private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, "data", "customer.tbl");
-    protected static final String HDFS_INPUT_PATH = "/customer/";
-    protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
-
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private static final String MINIDFS_BASEDIR = FileUtil.joinPath("target", "hdfs");
-    private MiniDFSCluster dfsCluster;
-
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
-
-    @Override
-    public void setUp() throws Exception {
-        cleanupStores();
-        HyracksUtils.init();
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
-    }
-
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File(MINIDFS_BASEDIR));
-        FileUtils.cleanDirectory(new File(MINIDFS_BASEDIR));
-    }
-
-    protected Configuration getConfiguration() {
-        return conf;
-    }
-
-    protected MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws IOException {
-        return new MiniDFSCluster(conf, numberOfNC, true, null);
-    }
-
-    /**
-     * Start the HDFS cluster and setup the data files
-     *
-     * @throws IOException
-     */
-    protected void startHDFS() throws IOException {
-        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path(BUILD_DIR), true);
-        System.setProperty("hadoop.log.dir", FileUtil.joinPath("target", "logs"));
-        getConfiguration().set("hdfs.minidfs.basedir", MINIDFS_BASEDIR);
-        dfsCluster = getMiniDFSCluster(getConfiguration(), numberOfNC);
-        FileSystem dfs = FileSystem.get(getConfiguration());
-        Path src = new Path(DATA_PATH);
-        Path dest = new Path(HDFS_INPUT_PATH);
-        Path result = new Path(HDFS_OUTPUT_PATH);
-        dfs.mkdirs(dest);
-        dfs.mkdirs(result);
-        dfs.copyFromLocalFile(src, dest);
-
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        getConfiguration().writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
-
-    /**
-     * Test a job with only HDFS read and writes.
-     *
-     * @throws Exception
-     */
-    public void testHDFSReadWriteOperators() throws Exception {
-        FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
-        FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-        conf.setInputFormat(TextInputFormat.class);
-
-        Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
-        InputSplit[] splits = conf.getInputFormat().getSplits(conf, numberOfNC * 4);
-
-        String[] readSchedule = scheduler.getLocationConstraints(splits);
-        JobSpecification jobSpec = new JobSpecification();
-        RecordDescriptor recordDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
-
-        String[] locations =
-                new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
-        HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
-                readSchedule, new TextKeyValueParserFactory());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations);
-
-        ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(jobSpec, 10, new int[] { 0 },
-                new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations);
-
-        HDFSWriteOperatorDescriptor writeOperator =
-                new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID);
-
-        jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
-        jobSpec.connect(
-                new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                        new FieldHashPartitionComputerFactory(new int[] { 0 },
-                                new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
-                        new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
-                sortOperator, 0, writeOperator, 0);
-        jobSpec.addRoot(writeOperator);
-
-        IHyracksClientConnection client =
-                new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
-        JobId jobId = client.startJob(jobSpec);
-        client.waitForCompletion(jobId);
-
-        Assert.assertEquals(true, checkResults());
-    }
-
-    /**
-     * Check if the results are correct
-     *
-     * @return true if correct
-     * @throws Exception
-     */
-    protected boolean checkResults() throws Exception {
-        FileSystem dfs = FileSystem.get(getConfiguration());
-        Path result = new Path(HDFS_OUTPUT_PATH);
-        Path actual = new Path(ACTUAL_RESULT_DIR);
-        dfs.copyToLocalFile(result, actual);
-
-        TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")),
-                new File(FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0")));
-        return true;
-    }
-
-    /**
-     * cleanup hdfs cluster
-     */
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        HyracksUtils.deinit();
-        cleanupHDFS();
-    }
-
-}