You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/06/18 17:49:34 UTC
[2/6] asterixdb git commit: [NO ISSUE] Remove obsolete support for
older HDFS versions
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
new file mode 100644
index 0000000..9633fb1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.hdfs.api.INcCollection;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * The scheduler conduct data-local scheduling for data reading on HDFS. This
+ * class works for Hadoop old API.
+ */
+public class Scheduler {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ /** a list of NCs */
+ private String[] NCs;
+
+ /** a map from ip to NCs */
+ private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+ /** a map from the NC name to the index */
+ private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+ /** a map from NC name to the NodeControllerInfo */
+ private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
+ /**
+ * the nc collection builder
+ */
+ private INcCollectionBuilder ncCollectionBuilder;
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+
+ public Scheduler(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ ClusterTopology topology = hcc.getClusterTopology();
+ this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
+ : new RackAwareNcCollectionBuilder(topology);
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ this.ncCollectionBuilder = ncCollectionBuilder;
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
+ throws HyracksException {
+ this(ncNameToNcInfos);
+ this.ncCollectionBuilder =
+ topology == null ? new IPProximityNcCollectionBuilder() : new RackAwareNcCollectionBuilder(topology);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
+ throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = ncCollectionBuilder;
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file
+ * splits. It guarantees the maximum slots a machine can is at most one more
+ * than the minimum slots a machine can get.
+ *
+ * @throws HyracksDataException
+ */
+ public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
+ if (splits == null) {
+ /** deal the case when the splits array is null */
+ return new String[] {};
+ }
+ int[] workloads = new int[NCs.length];
+ Arrays.fill(workloads, 0);
+ String[] locations = new String[splits.length];
+ Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+ /**
+ * upper bound number of slots that a machine can get
+ */
+ int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
+ : (splits.length / workloads.length + 1);
+ /**
+ * lower bound number of slots that a machine can get
+ */
+ int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
+
+ try {
+ Random random = new Random(System.currentTimeMillis());
+ boolean scheduled[] = new boolean[splits.length];
+ Arrays.fill(scheduled, false);
+ /**
+ * scan the splits and build the popularity map
+ * give the machines with less local splits more scheduling priority
+ */
+ buildPopularityMap(splits, locationToNumOfSplits);
+ /**
+ * push data-local lower-bounds slots to each machine
+ */
+ scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
+ /**
+ * push data-local upper-bounds slots to each machine
+ */
+ scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
+
+ int dataLocalCount = 0;
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i] == true) {
+ dataLocalCount++;
+ }
+ }
+ LOGGER.info("Data local rate: "
+ + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
+ /**
+ * push non-data-local lower-bounds slots to each machine
+ */
+ scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
+ /**
+ * push non-data-local upper-bounds slots to each machine
+ */
+ scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
+ return locations;
+ } catch (IOException e) {
+ throw HyracksException.create(e);
+ }
+ }
+
+ /**
+ * Schedule non-local slots to each machine
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param workloads
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slotLimit
+ * The maximum slots of each machine.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ */
+ private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
+ boolean[] scheduled) throws IOException, UnknownHostException {
+ /**
+ * build the map from available ips to the number of available slots
+ */
+ INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
+ workloads, slotLimit);
+ if (ncCollection.numAvailableSlots() == 0) {
+ return;
+ }
+ /**
+ * schedule no-local file reads
+ */
+ for (int i = 0; i < splits.length; i++) {
+ /** if there is no data-local NC choice, choose a random one */
+ if (!scheduled[i]) {
+ InputSplit split = splits[i];
+ String selectedNcName = ncCollection.findNearestAvailableSlot(split);
+ if (selectedNcName != null) {
+ int ncIndex = ncNameToIndex.get(selectedNcName);
+ workloads[ncIndex]++;
+ scheduled[i] = true;
+ locations[i] = selectedNcName;
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule data-local slots to each machine.
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param workloads
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slots
+ * The maximum slots of each machine.
+ * @param random
+ * The random generator.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ * @throws IOException
+ * @throws UnknownHostException
+ */
+ private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
+ boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
+ throws IOException, UnknownHostException {
+ /** scheduling candidates will be ordered inversely according to their popularity */
+ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+
+ @Override
+ public int compare(String s1, String s2) {
+ return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
+ }
+
+ });
+ for (int i = 0; i < splits.length; i++) {
+ if (scheduled[i]) {
+ continue;
+ }
+ /**
+ * get the location of all the splits
+ */
+ String[] locs = splits[i].getLocations();
+ if (locs.length > 0) {
+ scheduleCadndiates.clear();
+ for (int j = 0; j < locs.length; j++) {
+ scheduleCadndiates.add(locs[j]);
+ }
+
+ for (String candidate : scheduleCadndiates) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(candidate);
+ /**
+ * iterate overa all ips
+ */
+ for (InetAddress ip : allIps) {
+ /**
+ * if the node controller exists
+ */
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ /**
+ * set the ncs
+ */
+ List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+ int arrayPos = random.nextInt(dataLocations.size());
+ String nc = dataLocations.get(arrayPos);
+ int pos = ncNameToIndex.get(nc);
+ /**
+ * check if the node is already full
+ */
+ if (workloads[pos] < slots) {
+ locations[i] = nc;
+ workloads[pos]++;
+ scheduled[i] = true;
+ break;
+ }
+ }
+ }
+ /**
+ * break the loop for data-locations if the schedule has
+ * already been found
+ */
+ if (scheduled[i] == true) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Scan the splits once and build a popularity map
+ *
+ * @param splits
+ * the split array
+ * @param locationToNumOfSplits
+ * the map to be built
+ * @throws IOException
+ */
+ private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
+ throws IOException {
+ for (InputSplit split : splits) {
+ String[] locations = split.getLocations();
+ for (String loc : locations) {
+ IntWritable locCount = locationToNumOfSplits.get(loc);
+ if (locCount == null) {
+ locCount = new IntWritable(0);
+ locationToNumOfSplits.put(loc, locCount);
+ }
+ locCount.set(locCount.get() + 1);
+ }
+ }
+ }
+
+ /**
+ * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ try {
+ NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping.clear();
+ ncNameToIndex.clear();
+ int i = 0;
+
+ /*
+ * build the IP address to NC map
+ */
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.computeIfAbsent(ipAddr, k -> new ArrayList<>());
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+
+ /*
+ * set up the NC name to index mapping
+ */
+ for (i = 0; i < NCs.length; i++) {
+ ncNameToIndex.put(NCs[i], i);
+ }
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
new file mode 100644
index 0000000..b02a97b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private byte[] confBytes;
+
+ public ConfFactory(Job conf) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ conf.getConfiguration().write(dos);
+ confBytes = bos.toByteArray();
+ dos.close();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public Job getConf() throws HyracksDataException {
+ try {
+ Job conf = new Job();
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+ conf.getConfiguration().readFields(dis);
+ dis.close();
+ return conf;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
new file mode 100644
index 0000000..34b4c3e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings("rawtypes")
+public class FileSplitsFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private byte[] splitBytes;
+ private String splitClassName;
+
+ public FileSplitsFactory(List<FileSplit> splits) throws HyracksDataException {
+ splitBytes = splitsToBytes(splits);
+ if (splits.size() > 0) {
+ splitClassName = splits.get(0).getClass().getName();
+ }
+ }
+
+ public List<FileSplit> getSplits() throws HyracksDataException {
+ return bytesToSplits(splitBytes);
+ }
+
+ /**
+ * Convert splits to bytes.
+ *
+ * @param splits
+ * input splits
+ * @return bytes which serialize the splits
+ * @throws IOException
+ */
+ private byte[] splitsToBytes(List<FileSplit> splits) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeInt(splits.size());
+ int size = splits.size();
+ for (int i = 0; i < size; i++) {
+ splits.get(i).write(dos);
+ }
+ dos.close();
+ return bos.toByteArray();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ /**
+ * Covert bytes to splits.
+ *
+ * @param bytes
+ * @return
+ * @throws HyracksDataException
+ */
+ private List<FileSplit> bytesToSplits(byte[] bytes) throws HyracksDataException {
+ try {
+ Class splitClass = Class.forName(splitClassName);
+ Constructor[] constructors = splitClass.getDeclaredConstructors();
+ Constructor defaultConstructor = null;
+ for (Constructor constructor : constructors) {
+ if (constructor.getParameterTypes().length == 0) {
+ constructor.setAccessible(true);
+ defaultConstructor = constructor;
+ }
+ }
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis);
+ int size = dis.readInt();
+ List<FileSplit> splits = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ splits.add((FileSplit) defaultConstructor.newInstance());
+ splits.get(i).readFields(dis);
+ }
+ dis.close();
+ return splits;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..5adec78
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs2.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs.api.IKeyValueParser;
+import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * The HDFS file read operator using the Hadoop new API. To use this operator, a
+ * user need to provide an IKeyValueParserFactory implementation which convert
+ * key-value pairs into tuples.
+ */
+@SuppressWarnings("rawtypes")
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final ConfFactory confFactory;
+ private final FileSplitsFactory splitsFactory;
+ private final String[] scheduledLocations;
+ private final IKeyValueParserFactory tupleParserFactory;
+ private final boolean[] executed;
+
+ /**
+ * The constructor of HDFSReadOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param rd
+ * the output record descriptor
+ * @param conf
+ * the Hadoop JobConf object, which contains the input format and
+ * the input paths
+ * @param splits
+ * the array of FileSplits (HDFS chunks).
+ * @param scheduledLocations
+ * the node controller names to scan the FileSplits, which is an
+ * one-to-one mapping. The String array is obtained from the
+ * edu.cui
+ * .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
+ * (InputSplits[]).
+ * @param tupleParserFactory
+ * the ITupleParserFactory implementation instance.
+ * @throws HyracksException
+ */
+ public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, List<InputSplit> splits,
+ String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+ super(spec, 0, 1);
+ try {
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ this.splitsFactory = new FileSplitsFactory(fileSplits);
+ this.confFactory = new ConfFactory(conf);
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
+ this.tupleParserFactory = tupleParserFactory;
+ this.outRecDescs[0] = rd;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final List<FileSplit> inputSplits = splitsFactory.getSplits();
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+ private ContextFactory ctxFactory = new ContextFactory();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize() throws HyracksDataException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ writer.open();
+ Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
+ Job job = confFactory.getConf();
+ job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+ InputFormat inputFormat =
+ ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ int size = inputSplits.size();
+ for (int i = 0; i < size; i++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (scheduledLocations[i].equals(nodeName)) {
+ /**
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[i] == false) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+ reader.initialize(inputSplits.get(i), context);
+ while (reader.nextKeyValue() == true) {
+ parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
+ inputSplits.get(i).toString());
+ }
+ }
+ }
+ parser.close(writer);
+ } catch (Throwable th) {
+ writer.fail();
+ throw HyracksDataException.create(th);
+ } finally {
+ writer.close();
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..c691896
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs2.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import org.apache.hyracks.hdfs.api.ITupleWriter;
+import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
+
+/**
+ * The HDFS file write operator using the Hadoop new API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
+ */
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private ITupleWriterFactory tupleWriterFactory;
+
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, ITupleWriterFactory tupleWriterFactory)
+ throws HyracksException {
+ super(spec, 1, 0);
+ this.confFactory = new ConfFactory(conf);
+ this.tupleWriterFactory = tupleWriterFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ private FSDataOutputStream dos;
+ private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+ private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
+
+ @Override
+ public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Job conf = confFactory.getConf();
+ String outputPath = FileOutputFormat.getOutputPath(conf).toString();
+ String fileName = outputPath + File.separator + "part-" + partition;
+
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
+ try {
+ FileSystem dfs = FileSystem.get(conf.getConfiguration());
+ dos = dfs.create(new Path(fileName), true);
+ tupleWriter.open(dos);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ tupleWriter.write(dos, tuple);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ tupleWriter.close(dos);
+ dos.close();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
new file mode 100644
index 0000000..f5b2384
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs2.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+
+/**
+ * The scheduler conduct data-local scheduling for data reading on HDFS.
+ * This class works for Hadoop new API.
+ */
+@SuppressWarnings("deprecation")
+public class Scheduler {
+
+ private org.apache.hyracks.hdfs.scheduler.Scheduler scheduler;
+
+ /**
+ * The constructor of the scheduler
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port) throws HyracksException {
+ scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
+ throws HyracksException {
+ scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
+ throws HyracksException {
+ scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file splits
+ *
+ * @throws HyracksDataException
+ */
+ public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
+ try {
+ org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+ for (int i = 0; i < inputSplits.length; i++)
+ inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+ return scheduler.getLocationConstraints(inputSplits);
+ } catch (Exception e) {
+ throw HyracksException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
new file mode 100644
index 0000000..6804f31
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs2.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+@SuppressWarnings("deprecation")
+public class WrappedFileSplit implements InputSplit {
+
+ private String[] locations;
+ private long length;
+
+ public WrappedFileSplit(String[] locations, long length) {
+ this.locations = locations;
+ this.length = length;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int len = input.readInt();
+ locations = new String[len];
+ for (int i = 0; i < len; i++)
+ locations[i] = input.readUTF();
+ length = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.write(locations.length);
+ for (int i = 0; i < locations.length; i++)
+ output.writeUTF(locations[i]);
+ output.writeLong(length);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return locations;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/MiniDFSClusterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/MiniDFSClusterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/MiniDFSClusterFactory.java
new file mode 100644
index 0000000..2884e28
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/MiniDFSClusterFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MiniDFSClusterFactory {
+
+ public MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+ try {
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ builder.numDataNodes(numberOfNC);
+ MiniDFSCluster dfsCluster = builder.build();
+ return dfsCluster;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
new file mode 100644
index 0000000..8f96bab
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.dataflow;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.hdfs.lib.RawBinaryComparatorFactory;
+import org.apache.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
+import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
+import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+import org.apache.hyracks.hdfs.utils.HyracksUtils;
+import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.file.FileUtil;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+/**
+ * Test the org.apache.hyracks.hdfs.dataflow package,
+ * the operators for the Hadoop old API.
+ */
+@SuppressWarnings({ "deprecation" })
+public class DataflowTest extends TestCase {
+
+ protected static final String ACTUAL_RESULT_DIR = FileUtil.joinPath("target", "actual");
+ private static final String TEST_RESOURCES = FileUtil.joinPath("src", "test", "resources");
+ protected static final String EXPECTED_RESULT_PATH = FileUtil.joinPath(TEST_RESOURCES, "expected");
+ private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
+ protected static final String BUILD_DIR = FileUtil.joinPath("target", "build");
+
+ private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, "data", "customer.tbl");
+ protected static final String HDFS_INPUT_PATH = "/customer/";
+ protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private static final String MINIDFS_BASEDIR = FileUtil.joinPath("target", "hdfs");
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ @Override
+ public void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File(MINIDFS_BASEDIR));
+ FileUtils.cleanDirectory(new File(MINIDFS_BASEDIR));
+ }
+
+ protected Configuration getConfiguration() {
+ return conf;
+ }
+
+ protected MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws IOException {
+ return new MiniDFSCluster(conf, numberOfNC, true, null);
+ }
+
+ /**
+ * Start the HDFS cluster and setup the data files
+ *
+ * @throws IOException
+ */
+ protected void startHDFS() throws IOException {
+ getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path(BUILD_DIR), true);
+ System.setProperty("hadoop.log.dir", FileUtil.joinPath("target", "logs"));
+ getConfiguration().set("hdfs.minidfs.basedir", MINIDFS_BASEDIR);
+ dfsCluster = getMiniDFSCluster(getConfiguration(), numberOfNC);
+ FileSystem dfs = FileSystem.get(getConfiguration());
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ Path result = new Path(HDFS_OUTPUT_PATH);
+ dfs.mkdirs(dest);
+ dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ getConfiguration().writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ /**
+ * Test a job with only HDFS read and writes.
+ *
+ * @throws Exception
+ */
+ public void testHDFSReadWriteOperators() throws Exception {
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ conf.setInputFormat(TextInputFormat.class);
+
+ Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ InputSplit[] splits = conf.getInputFormat().getSplits(conf, numberOfNC * 4);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ JobSpecification jobSpec = new JobSpecification();
+ RecordDescriptor recordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+
+ String[] locations =
+ new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
+ HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
+ readSchedule, new TextKeyValueParserFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations);
+
+ ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(jobSpec, 10, new int[] { 0 },
+ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations);
+
+ HDFSWriteOperatorDescriptor writeOperator =
+ new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID);
+
+ jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
+ jobSpec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+ sortOperator, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
+
+ IHyracksClientConnection client =
+ new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ JobId jobId = client.startJob(jobSpec);
+ client.waitForCompletion(jobId);
+
+ Assert.assertEquals(true, checkResults());
+ }
+
+ /**
+ * Check if the results are correct
+ *
+ * @return true if correct
+ * @throws Exception
+ */
+ protected boolean checkResults() throws Exception {
+ FileSystem dfs = FileSystem.get(getConfiguration());
+ Path result = new Path(HDFS_OUTPUT_PATH);
+ Path actual = new Path(ACTUAL_RESULT_DIR);
+ dfs.copyToLocalFile(result, actual);
+
+ TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")),
+ new File(FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0")));
+ return true;
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
new file mode 100644
index 0000000..0a7418a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.scheduler;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.topology.TopologyDefinitionParser;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import junit.framework.TestCase;
+
+@SuppressWarnings("deprecation")
+public class SchedulerTest extends TestCase {
+ private static String TOPOLOGY_PATH = "src/test/resources/topology.xml";
+
+ private ClusterTopology parseTopology() throws IOException, SAXException {
+ FileReader fr = new FileReader(TOPOLOGY_PATH);
+ InputSource in = new InputSource(fr);
+ try {
+ return TopologyDefinitionParser.parse(in);
+ } finally {
+ fr.close();
+ }
+ }
+
+ /**
+ * Test the scheduler for the case when the Hyracks cluster is the HDFS cluster
+ *
+ * @throws Exception
+ */
+ public void testSchedulerSimple() throws Exception {
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
+ TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099, 5098, 5097);
+
+ InputSplit[] fileSplits = new InputSplit[6];
+ fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+ fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
+ fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc2", "nc3", "nc5" };
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+ }
+
+ /**
+ * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+ *
+ * @throws Exception
+ */
+ public void testSchedulerLargerHDFS() throws Exception {
+ int dataPort = 5099;
+ int resultPort = 5098;
+ int messagingPort = 5097;
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
+ TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.", dataPort, resultPort, messagingPort);
+ ncNameToNcInfos.put("nc7",
+ new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort),
+ new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
+ ncNameToNcInfos.put("nc12",
+ new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort),
+ new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
+
+ InputSplit[] fileSplits = new InputSplit[12];
+ fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+ fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
+ fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[8] =
+ new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
+ fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
+ fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
+ fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+ String[] expectedResults =
+ new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12", "nc7", "nc7", "nc12" };
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ expectedResults =
+ new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc7", "nc12", "nc7", "nc12" };
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+ }
+
+ /**
+ * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+ *
+ * @throws Exception
+ */
+ public void testSchedulerSmallerHDFS() throws Exception {
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
+ TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099, 5098, 5097);
+
+ InputSplit[] fileSplits = new InputSplit[12];
+ fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" });
+ fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.3" });
+ fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.1" });
+ fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.2" });
+ fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+
+ String[] expectedResults =
+ new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6", "nc5", "nc6" };
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+ }
+
+ /**
+ * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+ *
+ * @throws Exception
+ */
+ public void testSchedulerSmallerHDFSOdd() throws Exception {
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
+ TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099, 5098, 5097);
+
+ InputSplit[] fileSplits = new InputSplit[13];
+ fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" });
+ fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.3" });
+ fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
+ fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.1" });
+ fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.2" });
+ fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
+ fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[12] = new FileSplit(new Path("part-13"), 0, 0, new String[] { "10.0.0.2", "10.0.0.4", "10.0.0.5" });
+
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+ "nc5", "nc2", "nc4" };
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ }
+
+ /**
+ * Test boundary cases where splits array is empty or null
+ *
+ * @throws Exception
+ */
+ public void testSchedulercBoundary() throws Exception {
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
+ TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099, 5098, 5097);
+
+ /** test empty file splits */
+ InputSplit[] fileSplits = new InputSplit[0];
+ String[] expectedResults = new String[] {};
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ fileSplits = null;
+ expectedResults = new String[] {};
+
+ scheduler = new Scheduler(ncNameToNcInfos);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
new file mode 100644
index 0000000..1fddc46
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs.utils;
+
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class HyracksUtils {
+
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+ public static final String CC_HOST = "localhost";
+
+ public static final int FRAME_SIZE = 65536;
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.setClientListenAddress(CC_HOST);
+ ccConfig.setClusterListenAddress(CC_HOST);
+ ccConfig.setClusterListenPort(TEST_HYRACKS_CC_PORT);
+ ccConfig.setClientListenPort(TEST_HYRACKS_CC_CLIENT_PORT);
+ ccConfig.setJobHistorySize(0);
+ ccConfig.setProfileDumpPeriod(-1);
+
+ // cluster controller
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ // two node controllers
+ NCConfig ncConfig1 = new NCConfig(NC1_ID);
+ ncConfig1.setClusterAddress("localhost");
+ ncConfig1.setClusterListenAddress("localhost");
+ ncConfig1.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig1.setDataListenAddress("127.0.0.1");
+ ncConfig1.setResultListenAddress("127.0.0.1");
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig(NC2_ID);
+ ncConfig2.setClusterAddress("localhost");
+ ncConfig2.setClusterListenAddress("localhost");
+ ncConfig2.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig2.setDataListenAddress("127.0.0.1");
+ ncConfig2.setResultListenAddress("127.0.0.1");
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ // hyracks connection
+ hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+ }
+
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+
+ public static void runJob(JobSpecification spec, String appName) throws Exception {
+ spec.setFrameSize(FRAME_SIZE);
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ hcc.waitForCompletion(jobId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
new file mode 100644
index 0000000..02c0a20
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.hdfs2.dataflow;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.hdfs.MiniDFSClusterFactory;
+import org.apache.hyracks.hdfs.lib.RawBinaryComparatorFactory;
+import org.apache.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
+import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
+import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
+import org.apache.hyracks.hdfs.utils.HyracksUtils;
+import org.apache.hyracks.hdfs2.scheduler.Scheduler;
+import org.junit.Assert;
+
+/**
+ * Test the org.apache.hyracks.hdfs2.dataflow package,
+ * the operators for the Hadoop new API.
+ */
+public class DataflowTest extends org.apache.hyracks.hdfs.dataflow.DataflowTest {
+
+ private MiniDFSClusterFactory dfsClusterFactory = new MiniDFSClusterFactory();
+
+ private Job conf;
+
+ @Override
+ public void setUp() throws Exception {
+ conf = new Job();
+ super.setUp();
+ }
+
+ @Override
+ protected Configuration getConfiguration() {
+ return conf.getConfiguration();
+ }
+
+ @Override
+ protected MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+ return dfsClusterFactory.getMiniDFSCluster(conf, numberOfNC);
+ }
+
+ /**
+ * Test a job with only HDFS read and writes.
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void testHDFSReadWriteOperators() throws Exception {
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ conf.setInputFormatClass(TextInputFormat.class);
+
+ Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(), getConfiguration());
+ List<InputSplit> splits = inputFormat.getSplits(conf);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ JobSpecification jobSpec = new JobSpecification();
+ RecordDescriptor recordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+
+ String[] locations =
+ new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
+ HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
+ readSchedule, new TextKeyValueParserFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations);
+
+ ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(jobSpec, 10, new int[] { 0 },
+ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations);
+
+ HDFSWriteOperatorDescriptor writeOperator =
+ new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID);
+
+ jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
+ jobSpec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+ sortOperator, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
+
+ IHyracksClientConnection client =
+ new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ JobId jobId = client.startJob(jobSpec);
+ client.waitForCompletion(jobId);
+
+ Assert.assertEquals(true, checkResults());
+ }
+}