You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/06/18 17:49:37 UTC
[5/6] asterixdb git commit: [NO ISSUE] Remove obsolete support for
older HDFS versions
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
deleted file mode 100644
index fd1438c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.lib;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.hdfs.api.ITupleWriter;
-import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class TextTupleWriterFactory implements ITupleWriterFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
- return new ITupleWriter() {
- private byte newLine = "\n".getBytes()[0];
-
- @Override
- public void open(DataOutput output) {
-
- }
-
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- byte[] data = tuple.getFieldData(0);
- int start = tuple.getFieldStart(0);
- int len = tuple.getFieldLength(0);
- try {
- output.write(data, start, len);
- output.writeByte(newLine);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public void close(DataOutput output) {
-
- }
-
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
deleted file mode 100644
index c53a779..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-
-@SuppressWarnings("deprecation")
-public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
-
- @Override
- public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
- final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
- final int[] workloads, final int slotLimit) {
- final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
- for (int i = 0; i < workloads.length; i++) {
- if (workloads[i] < slotLimit) {
- byte[] rawip;
- try {
- rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
- } catch (UnknownHostException e) {
- // QQQ Should probably have a neater solution than this
- throw new RuntimeException(e);
- }
- BytesWritable ip = new BytesWritable(rawip);
- IntWritable availableSlot = availableIpsToSlots.get(ip);
- if (availableSlot == null) {
- availableSlot = new IntWritable(slotLimit - workloads[i]);
- availableIpsToSlots.put(ip, availableSlot);
- } else {
- availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
- }
- }
- }
- return new INcCollection() {
-
- @Override
- public String findNearestAvailableSlot(InputSplit split) {
- try {
- String[] locs = split.getLocations();
- int minDistance = Integer.MAX_VALUE;
- BytesWritable currentCandidateIp = null;
- if (locs == null || locs.length > 0) {
- for (int j = 0; j < locs.length; j++) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
- for (InetAddress ip : allIps) {
- BytesWritable splitIp = new BytesWritable(ip.getAddress());
- /**
- * if the node controller exists
- */
- BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
- if (candidateNcIp == null) {
- candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
- }
- if (candidateNcIp != null) {
- if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
- byte[] candidateIP = candidateNcIp.getBytes();
- byte[] splitIP = splitIp.getBytes();
- int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
- | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
- int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
- | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
- int distance = Math.abs(candidateInt - splitInt);
- if (minDistance > distance) {
- minDistance = distance;
- currentCandidateIp = candidateNcIp;
- }
- }
- }
- }
- }
- } else {
- for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
- if (entry.getValue().get() > 0) {
- currentCandidateIp = entry.getKey();
- break;
- }
- }
- }
-
- if (currentCandidateIp != null) {
- /**
- * Update the entry of the selected IP
- */
- IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
- availableSlot.set(availableSlot.get() - 1);
- if (availableSlot.get() == 0) {
- availableIpsToSlots.remove(currentCandidateIp);
- }
- /**
- * Update the entry of the selected NC
- */
- List<String> dataLocations = ipToNcMapping
- .get(InetAddress.getByAddress(currentCandidateIp.getBytes()).getHostAddress());
- for (String nc : dataLocations) {
- int ncIndex = ncNameToIndex.get(nc);
- if (workloads[ncIndex] < slotLimit) {
- return nc;
- }
- }
- }
- /** not scheduled */
- return null;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public int numAvailableSlots() {
- return availableIpsToSlots.size();
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
deleted file mode 100644
index 63be8c5..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-@SuppressWarnings("deprecation")
-public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
- private static final Logger LOGGER = LogManager.getLogger();
- private ClusterTopology topology;
-
- public RackAwareNcCollectionBuilder(ClusterTopology topology) {
- this.topology = topology;
- }
-
- @Override
- public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
- final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
- final int[] workloads, final int slotLimit) {
- try {
- final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
- for (String NC : NCs) {
- List<Integer> path = new ArrayList<>();
- String ipAddress = InetAddress
- .getByAddress(ncNameToNcInfos.get(NC).getNetworkAddress().lookupIpAddress()).getHostAddress();
- topology.lookupNetworkTerminal(ipAddress, path);
- if (path.isEmpty()) {
- // if the hyracks nc is not in the defined cluster
- path.add(Integer.MIN_VALUE);
- LOGGER.info(NC + "'s IP address is not in the cluster toplogy file!");
- }
- List<String> ncs = pathToNCs.computeIfAbsent(path, k -> new ArrayList<>());
- ncs.add(NC);
- }
-
- final TreeMap<List<Integer>, IntWritable> availableIpsToSlots =
- new TreeMap<List<Integer>, IntWritable>((l1, l2) -> {
- int commonLength = Math.min(l1.size(), l2.size());
- for (int i = 0; i < commonLength; i++) {
- int value1 = l1.get(i);
- int value2 = l2.get(i);
- int cmp = Integer.compare(value1, value2);
- if (cmp != 0) {
- return cmp;
- }
- }
- return Integer.compare(l1.size(), l2.size());
- });
- for (int i = 0; i < workloads.length; i++) {
- if (workloads[i] < slotLimit) {
- List<Integer> path = new ArrayList<Integer>();
- String ipAddress =
- InetAddress.getByAddress(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress())
- .getHostAddress();
- topology.lookupNetworkTerminal(ipAddress, path);
- if (path.isEmpty()) {
- // if the hyracks nc is not in the defined cluster
- path.add(Integer.MIN_VALUE);
- }
- IntWritable availableSlot = availableIpsToSlots.get(path);
- if (availableSlot == null) {
- availableSlot = new IntWritable(slotLimit - workloads[i]);
- availableIpsToSlots.put(path, availableSlot);
- } else {
- availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
- }
- }
- }
- return new INcCollection() {
-
- @Override
- public String findNearestAvailableSlot(InputSplit split) {
- try {
- String[] locs = split.getLocations();
- int minDistance = Integer.MAX_VALUE;
- List<Integer> currentCandidatePath = null;
- if (locs == null || locs.length > 0) {
- for (String loc : locs) {
- /*
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(loc);
- boolean inTopology = false;
- for (InetAddress ip : allIps) {
- List<Integer> splitPath = new ArrayList<>();
- boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
- if (!inCluster) {
- continue;
- }
- inTopology = true;
- /*
- * if the node controller exists
- */
- List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
- if (candidatePath == null) {
- candidatePath = availableIpsToSlots.ceilingKey(splitPath);
- }
- if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
- int distance = distance(splitPath, candidatePath);
- if (minDistance > distance) {
- minDistance = distance;
- currentCandidatePath = candidatePath;
- }
- }
- }
-
- if (!inTopology) {
- LOGGER.info(loc + "'s IP address is not in the cluster toplogy file!");
- /*
- * if the machine is not in the toplogy file
- */
- List<Integer> candidatePath = null;
- for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
- if (entry.getValue().get() > 0) {
- candidatePath = entry.getKey();
- break;
- }
- }
- /* the split path is empty */
- if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
- currentCandidatePath = candidatePath;
- }
- }
- }
- } else {
- for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
- if (entry.getValue().get() > 0) {
- currentCandidatePath = entry.getKey();
- break;
- }
- }
- }
-
- if (currentCandidatePath != null && !currentCandidatePath.isEmpty()) {
- /*
- * Update the entry of the selected IP
- */
- IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
- availableSlot.set(availableSlot.get() - 1);
- if (availableSlot.get() == 0) {
- availableIpsToSlots.remove(currentCandidatePath);
- }
- /*
- * Update the entry of the selected NC
- */
- List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
- for (String candidate : candidateNcs) {
- int ncIndex = ncNameToIndex.get(candidate);
- if (workloads[ncIndex] < slotLimit) {
- return candidate;
- }
- }
- }
- /* not scheduled */
- return null;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public int numAvailableSlots() {
- return availableIpsToSlots.size();
- }
-
- private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
- int commonLength = Math.min(splitPath.size(), candidatePath.size());
- int distance = 0;
- for (int i = 0; i < commonLength; i++) {
- distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
- }
- List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
- for (int i = commonLength; i < restElements.size(); i++) {
- distance = distance * 100 + Math.abs(restElements.get(i));
- }
- return distance;
- }
- };
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
deleted file mode 100644
index 9633fb1..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.scheduler;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Random;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollection;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * The scheduler conduct data-local scheduling for data reading on HDFS. This
- * class works for Hadoop old API.
- */
-public class Scheduler {
- private static final Logger LOGGER = LogManager.getLogger();
-
- /** a list of NCs */
- private String[] NCs;
-
- /** a map from ip to NCs */
- private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
- /** a map from the NC name to the index */
- private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
-
- /** a map from NC name to the NodeControllerInfo */
- private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
- /**
- * the nc collection builder
- */
- private INcCollectionBuilder ncCollectionBuilder;
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
-
- public Scheduler(String ipAddress, int port) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- this.ncNameToNcInfos = hcc.getNodeControllerInfos();
- ClusterTopology topology = hcc.getClusterTopology();
- this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
- : new RackAwareNcCollectionBuilder(topology);
- loadIPAddressToNCMap(ncNameToNcInfos);
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- this.ncNameToNcInfos = hcc.getNodeControllerInfos();
- this.ncCollectionBuilder = ncCollectionBuilder;
- loadIPAddressToNCMap(ncNameToNcInfos);
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- this.ncNameToNcInfos = ncNameToNcInfos;
- this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
- loadIPAddressToNCMap(ncNameToNcInfos);
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @param topology
- * the hyracks cluster toplogy
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
- throws HyracksException {
- this(ncNameToNcInfos);
- this.ncCollectionBuilder =
- topology == null ? new IPProximityNcCollectionBuilder() : new RackAwareNcCollectionBuilder(topology);
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
- throws HyracksException {
- this.ncNameToNcInfos = ncNameToNcInfos;
- this.ncCollectionBuilder = ncCollectionBuilder;
- loadIPAddressToNCMap(ncNameToNcInfos);
- }
-
- /**
- * Set location constraints for a file scan operator with a list of file
- * splits. It guarantees the maximum slots a machine can is at most one more
- * than the minimum slots a machine can get.
- *
- * @throws HyracksDataException
- */
- public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
- if (splits == null) {
- /** deal the case when the splits array is null */
- return new String[] {};
- }
- int[] workloads = new int[NCs.length];
- Arrays.fill(workloads, 0);
- String[] locations = new String[splits.length];
- Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
- /**
- * upper bound number of slots that a machine can get
- */
- int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
- : (splits.length / workloads.length + 1);
- /**
- * lower bound number of slots that a machine can get
- */
- int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
-
- try {
- Random random = new Random(System.currentTimeMillis());
- boolean scheduled[] = new boolean[splits.length];
- Arrays.fill(scheduled, false);
- /**
- * scan the splits and build the popularity map
- * give the machines with less local splits more scheduling priority
- */
- buildPopularityMap(splits, locationToNumOfSplits);
- /**
- * push data-local lower-bounds slots to each machine
- */
- scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
- /**
- * push data-local upper-bounds slots to each machine
- */
- scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
-
- int dataLocalCount = 0;
- for (int i = 0; i < scheduled.length; i++) {
- if (scheduled[i] == true) {
- dataLocalCount++;
- }
- }
- LOGGER.info("Data local rate: "
- + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
- /**
- * push non-data-local lower-bounds slots to each machine
- */
- scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
- /**
- * push non-data-local upper-bounds slots to each machine
- */
- scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
- return locations;
- } catch (IOException e) {
- throw HyracksException.create(e);
- }
- }
-
- /**
- * Schedule non-local slots to each machine
- *
- * @param splits
- * The HDFS file splits.
- * @param workloads
- * The current capacity of each machine.
- * @param locations
- * The result schedule.
- * @param slotLimit
- * The maximum slots of each machine.
- * @param scheduled
- * Indicate which slot is scheduled.
- */
- private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
- boolean[] scheduled) throws IOException, UnknownHostException {
- /**
- * build the map from available ips to the number of available slots
- */
- INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
- workloads, slotLimit);
- if (ncCollection.numAvailableSlots() == 0) {
- return;
- }
- /**
- * schedule no-local file reads
- */
- for (int i = 0; i < splits.length; i++) {
- /** if there is no data-local NC choice, choose a random one */
- if (!scheduled[i]) {
- InputSplit split = splits[i];
- String selectedNcName = ncCollection.findNearestAvailableSlot(split);
- if (selectedNcName != null) {
- int ncIndex = ncNameToIndex.get(selectedNcName);
- workloads[ncIndex]++;
- scheduled[i] = true;
- locations[i] = selectedNcName;
- }
- }
- }
- }
-
- /**
- * Schedule data-local slots to each machine.
- *
- * @param splits
- * The HDFS file splits.
- * @param workloads
- * The current capacity of each machine.
- * @param locations
- * The result schedule.
- * @param slots
- * The maximum slots of each machine.
- * @param random
- * The random generator.
- * @param scheduled
- * Indicate which slot is scheduled.
- * @throws IOException
- * @throws UnknownHostException
- */
- private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
- boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
- throws IOException, UnknownHostException {
- /** scheduling candidates will be ordered inversely according to their popularity */
- PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
-
- @Override
- public int compare(String s1, String s2) {
- return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
- }
-
- });
- for (int i = 0; i < splits.length; i++) {
- if (scheduled[i]) {
- continue;
- }
- /**
- * get the location of all the splits
- */
- String[] locs = splits[i].getLocations();
- if (locs.length > 0) {
- scheduleCadndiates.clear();
- for (int j = 0; j < locs.length; j++) {
- scheduleCadndiates.add(locs[j]);
- }
-
- for (String candidate : scheduleCadndiates) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(candidate);
- /**
- * iterate overa all ips
- */
- for (InetAddress ip : allIps) {
- /**
- * if the node controller exists
- */
- if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- /**
- * set the ncs
- */
- List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
- int arrayPos = random.nextInt(dataLocations.size());
- String nc = dataLocations.get(arrayPos);
- int pos = ncNameToIndex.get(nc);
- /**
- * check if the node is already full
- */
- if (workloads[pos] < slots) {
- locations[i] = nc;
- workloads[pos]++;
- scheduled[i] = true;
- break;
- }
- }
- }
- /**
- * break the loop for data-locations if the schedule has
- * already been found
- */
- if (scheduled[i] == true) {
- break;
- }
- }
- }
- }
- }
-
- /**
- * Scan the splits once and build a popularity map
- *
- * @param splits
- * the split array
- * @param locationToNumOfSplits
- * the map to be built
- * @throws IOException
- */
- private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
- throws IOException {
- for (InputSplit split : splits) {
- String[] locations = split.getLocations();
- for (String loc : locations) {
- IntWritable locCount = locationToNumOfSplits.get(loc);
- if (locCount == null) {
- locCount = new IntWritable(0);
- locationToNumOfSplits.put(loc, locCount);
- }
- locCount.set(locCount.get() + 1);
- }
- }
- }
-
- /**
- * Load the IP-address-to-NC map from the NCNameToNCInfoMap
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- try {
- NCs = new String[ncNameToNcInfos.size()];
- ipToNcMapping.clear();
- ncNameToIndex.clear();
- int i = 0;
-
- /*
- * build the IP address to NC map
- */
- for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
- String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
- .getHostAddress();
- List<String> matchedNCs = ipToNcMapping.computeIfAbsent(ipAddr, k -> new ArrayList<>());
- matchedNCs.add(entry.getKey());
- NCs[i] = entry.getKey();
- i++;
- }
-
- /*
- * set up the NC name to index mapping
- */
- for (i = 0; i < NCs.length; i++) {
- ncNameToIndex.put(NCs[i], i);
- }
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
deleted file mode 100644
index b02a97b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.Serializable;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ConfFactory implements Serializable {
- private static final long serialVersionUID = 1L;
- private byte[] confBytes;
-
- public ConfFactory(Job conf) throws HyracksDataException {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- conf.getConfiguration().write(dos);
- confBytes = bos.toByteArray();
- dos.close();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- public Job getConf() throws HyracksDataException {
- try {
- Job conf = new Job();
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
- conf.getConfiguration().readFields(dis);
- dis.close();
- return conf;
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
deleted file mode 100644
index 34b4c3e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-@SuppressWarnings("rawtypes")
-public class FileSplitsFactory implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private byte[] splitBytes;
- private String splitClassName;
-
- public FileSplitsFactory(List<FileSplit> splits) throws HyracksDataException {
- splitBytes = splitsToBytes(splits);
- if (splits.size() > 0) {
- splitClassName = splits.get(0).getClass().getName();
- }
- }
-
- public List<FileSplit> getSplits() throws HyracksDataException {
- return bytesToSplits(splitBytes);
- }
-
- /**
- * Convert splits to bytes.
- *
- * @param splits
- * input splits
- * @return bytes which serialize the splits
- * @throws IOException
- */
- private byte[] splitsToBytes(List<FileSplit> splits) throws HyracksDataException {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(splits.size());
- int size = splits.size();
- for (int i = 0; i < size; i++) {
- splits.get(i).write(dos);
- }
- dos.close();
- return bos.toByteArray();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- /**
- * Covert bytes to splits.
- *
- * @param bytes
- * @return
- * @throws HyracksDataException
- */
- private List<FileSplit> bytesToSplits(byte[] bytes) throws HyracksDataException {
- try {
- Class splitClass = Class.forName(splitClassName);
- Constructor[] constructors = splitClass.getDeclaredConstructors();
- Constructor defaultConstructor = null;
- for (Constructor constructor : constructors) {
- if (constructor.getParameterTypes().length == 0) {
- constructor.setAccessible(true);
- defaultConstructor = constructor;
- }
- }
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- DataInputStream dis = new DataInputStream(bis);
- int size = dis.readInt();
- List<FileSplit> splits = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- splits.add((FileSplit) defaultConstructor.newInstance());
- splits.get(i).readFields(dis);
- }
- dis.close();
- return splits;
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
deleted file mode 100644
index 5adec78..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-import org.apache.hyracks.hdfs.api.IKeyValueParser;
-import org.apache.hyracks.hdfs.api.IKeyValueParserFactory;
-
-/**
- * The HDFS file read operator using the Hadoop new API. To use this operator, a
- * user need to provide an IKeyValueParserFactory implementation which convert
- * key-value pairs into tuples.
- */
-@SuppressWarnings("rawtypes")
-public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private final ConfFactory confFactory;
- private final FileSplitsFactory splitsFactory;
- private final String[] scheduledLocations;
- private final IKeyValueParserFactory tupleParserFactory;
- private final boolean[] executed;
-
- /**
- * The constructor of HDFSReadOperatorDescriptor.
- *
- * @param spec
- * the JobSpecification object
- * @param rd
- * the output record descriptor
- * @param conf
- * the Hadoop JobConf object, which contains the input format and
- * the input paths
- * @param splits
- * the array of FileSplits (HDFS chunks).
- * @param scheduledLocations
- * the node controller names to scan the FileSplits, which is an
- * one-to-one mapping. The String array is obtained from the
- * edu.cui
- * .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
- * (InputSplits[]).
- * @param tupleParserFactory
- * the ITupleParserFactory implementation instance.
- * @throws HyracksException
- */
- public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, List<InputSplit> splits,
- String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
- super(spec, 0, 1);
- try {
- List<FileSplit> fileSplits = new ArrayList<FileSplit>();
- for (int i = 0; i < splits.size(); i++) {
- fileSplits.add((FileSplit) splits.get(i));
- }
- this.splitsFactory = new FileSplitsFactory(fileSplits);
- this.confFactory = new ConfFactory(conf);
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- this.scheduledLocations = scheduledLocations;
- this.executed = new boolean[scheduledLocations.length];
- Arrays.fill(executed, false);
- this.tupleParserFactory = tupleParserFactory;
- this.outRecDescs[0] = rd;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
- final List<FileSplit> inputSplits = splitsFactory.getSplits();
-
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
- private ContextFactory ctxFactory = new ContextFactory();
-
- @SuppressWarnings("unchecked")
- @Override
- public void initialize() throws HyracksDataException {
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- writer.open();
- Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
- Job job = confFactory.getConf();
- job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
- IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
- InputFormat inputFormat =
- ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- int size = inputSplits.size();
- for (int i = 0; i < size; i++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (scheduledLocations[i].equals(nodeName)) {
- /**
- * pick an unread split to read synchronize among
- * simultaneous partitions in the same machine
- */
- synchronized (executed) {
- if (executed[i] == false) {
- executed[i] = true;
- } else {
- continue;
- }
- }
-
- /**
- * read the split
- */
- TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
- context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
- RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
- reader.initialize(inputSplits.get(i), context);
- while (reader.nextKeyValue() == true) {
- parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
- inputSplits.get(i).toString());
- }
- }
- }
- parser.close(writer);
- } catch (Throwable th) {
- writer.fail();
- throw HyracksDataException.create(th);
- } finally {
- writer.close();
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
deleted file mode 100644
index c691896..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.dataflow;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.hdfs.api.ITupleWriter;
-import org.apache.hyracks.hdfs.api.ITupleWriterFactory;
-
-/**
- * The HDFS file write operator using the Hadoop new API. To use this operator,
- * a user need to provide an ITupleWriterFactory.
- */
-public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private ConfFactory confFactory;
- private ITupleWriterFactory tupleWriterFactory;
-
- /**
- * The constructor of HDFSWriteOperatorDescriptor.
- *
- * @param spec
- * the JobSpecification object
- * @param conf
- * the Hadoop JobConf which contains the output path
- * @param tupleWriterFactory
- * the ITupleWriterFactory implementation object
- * @throws HyracksException
- */
- public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, ITupleWriterFactory tupleWriterFactory)
- throws HyracksException {
- super(spec, 1, 0);
- this.confFactory = new ConfFactory(conf);
- this.tupleWriterFactory = tupleWriterFactory;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
-
- return new AbstractUnaryInputSinkOperatorNodePushable() {
-
- private FSDataOutputStream dos;
- private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
- private FrameTupleReference tuple = new FrameTupleReference();
- private ITupleWriter tupleWriter;
- private ClassLoader ctxCL;
-
- @Override
- public void open() throws HyracksDataException {
- ctxCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- Job conf = confFactory.getConf();
- String outputPath = FileOutputFormat.getOutputPath(conf).toString();
- String fileName = outputPath + File.separator + "part-" + partition;
-
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
- try {
- FileSystem dfs = FileSystem.get(conf.getConfiguration());
- dos = dfs.create(new Path(fileName), true);
- tupleWriter.open(dos);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- tupleWriter.write(dos, tuple);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- tupleWriter.close(dos);
- dos.close();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
deleted file mode 100644
index f5b2384..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs2.scheduler;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.topology.ClusterTopology;
-import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
-
-/**
- * The scheduler conduct data-local scheduling for data reading on HDFS.
- * This class works for Hadoop new API.
- */
-@SuppressWarnings("deprecation")
-public class Scheduler {
-
- private org.apache.hyracks.hdfs.scheduler.Scheduler scheduler;
-
- /**
- * The constructor of the scheduler
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- public Scheduler(String ipAddress, int port) throws HyracksException {
- scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @param topology
- * the hyracks cluster toplogy
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
- throws HyracksException {
- scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, topology);
- }
-
- /**
- * The constructor of the scheduler.
- *
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @throws HyracksException
- */
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
- throws HyracksException {
- scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
- }
-
- /**
- * Set location constraints for a file scan operator with a list of file splits
- *
- * @throws HyracksDataException
- */
- public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
- try {
- org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
- for (int i = 0; i < inputSplits.length; i++)
- inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
- return scheduler.getLocationConstraints(inputSplits);
- } catch (Exception e) {
- throw HyracksException.create(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
deleted file mode 100644
index 6804f31..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/scheduler/WrappedFileSplit.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.hdfs2.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.InputSplit;
-
-/**
- * The wrapped implementation of InputSplit, for the new API scheduler
- * to reuse the old API scheduler
- */
-@SuppressWarnings("deprecation")
-public class WrappedFileSplit implements InputSplit {
-
- private String[] locations;
- private long length;
-
- public WrappedFileSplit(String[] locations, long length) {
- this.locations = locations;
- this.length = length;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int len = input.readInt();
- locations = new String[len];
- for (int i = 0; i < len; i++)
- locations[i] = input.readUTF();
- length = input.readLong();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.write(locations.length);
- for (int i = 0; i < locations.length; i++)
- output.writeUTF(locations[i]);
- output.writeLong(length);
- }
-
- @Override
- public long getLength() throws IOException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- return locations;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2d90c0c1/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
deleted file mode 100644
index 8f96bab..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.dataflow;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import org.apache.hyracks.hdfs.lib.RawBinaryComparatorFactory;
-import org.apache.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
-import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
-import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
-import org.apache.hyracks.hdfs.utils.HyracksUtils;
-import org.apache.hyracks.test.support.TestUtils;
-import org.apache.hyracks.util.file.FileUtil;
-import org.junit.Assert;
-
-import junit.framework.TestCase;
-
-/**
- * Test the org.apache.hyracks.hdfs.dataflow package,
- * the operators for the Hadoop old API.
- */
-@SuppressWarnings({ "deprecation" })
-public class DataflowTest extends TestCase {
-
- protected static final String ACTUAL_RESULT_DIR = FileUtil.joinPath("target", "actual");
- private static final String TEST_RESOURCES = FileUtil.joinPath("src", "test", "resources");
- protected static final String EXPECTED_RESULT_PATH = FileUtil.joinPath(TEST_RESOURCES, "expected");
- private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
- protected static final String BUILD_DIR = FileUtil.joinPath("target", "build");
-
- private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, "data", "customer.tbl");
- protected static final String HDFS_INPUT_PATH = "/customer/";
- protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String MINIDFS_BASEDIR = FileUtil.joinPath("target", "hdfs");
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
-
- @Override
- public void setUp() throws Exception {
- cleanupStores();
- HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File(MINIDFS_BASEDIR));
- FileUtils.cleanDirectory(new File(MINIDFS_BASEDIR));
- }
-
- protected Configuration getConfiguration() {
- return conf;
- }
-
- protected MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws IOException {
- return new MiniDFSCluster(conf, numberOfNC, true, null);
- }
-
- /**
- * Start the HDFS cluster and setup the data files
- *
- * @throws IOException
- */
- protected void startHDFS() throws IOException {
- getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path(BUILD_DIR), true);
- System.setProperty("hadoop.log.dir", FileUtil.joinPath("target", "logs"));
- getConfiguration().set("hdfs.minidfs.basedir", MINIDFS_BASEDIR);
- dfsCluster = getMiniDFSCluster(getConfiguration(), numberOfNC);
- FileSystem dfs = FileSystem.get(getConfiguration());
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- Path result = new Path(HDFS_OUTPUT_PATH);
- dfs.mkdirs(dest);
- dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- getConfiguration().writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- /**
- * Test a job with only HDFS read and writes.
- *
- * @throws Exception
- */
- public void testHDFSReadWriteOperators() throws Exception {
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInputFormat(TextInputFormat.class);
-
- Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
- InputSplit[] splits = conf.getInputFormat().getSplits(conf, numberOfNC * 4);
-
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- JobSpecification jobSpec = new JobSpecification();
- RecordDescriptor recordDesc =
- new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
-
- String[] locations =
- new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
- HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
- readSchedule, new TextKeyValueParserFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations);
-
- ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(jobSpec, 10, new int[] { 0 },
- new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations);
-
- HDFSWriteOperatorDescriptor writeOperator =
- new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID);
-
- jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
- jobSpec.connect(
- new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
- sortOperator, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
-
- IHyracksClientConnection client =
- new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
- JobId jobId = client.startJob(jobSpec);
- client.waitForCompletion(jobId);
-
- Assert.assertEquals(true, checkResults());
- }
-
- /**
- * Check if the results are correct
- *
- * @return true if correct
- * @throws Exception
- */
- protected boolean checkResults() throws Exception {
- FileSystem dfs = FileSystem.get(getConfiguration());
- Path result = new Path(HDFS_OUTPUT_PATH);
- Path actual = new Path(ACTUAL_RESULT_DIR);
- dfs.copyToLocalFile(result, actual);
-
- TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")),
- new File(FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0")));
- return true;
- }
-
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-
- @Override
- public void tearDown() throws Exception {
- HyracksUtils.deinit();
- cleanupHDFS();
- }
-
-}