You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2022/09/01 20:34:53 UTC

Change in asterixdb[neo]: [ASTERIXDB-3072] Make HDFS scheduler to use different type of channels

From Wail Alkowaileet <wa...@gmail.com>:

Wail Alkowaileet has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17224 )


Change subject: [ASTERIXDB-3072] Make HDFS scheduler to use different type of channels
......................................................................

[ASTERIXDB-3072] Make HDFS scheduler to use different type of channels

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
HDFS scheduler only uses plain connection. This change allows it
to use different type of connections

Change-Id: I64f9f4a30a6564aac26e99433ccc695975a22a3b
---
M hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
M hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 46 insertions(+), 35 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/17224/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 9e49d86..3506216 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
@@ -72,10 +73,12 @@
 
     public static Scheduler initializeHDFSScheduler(ICCServiceContext serviceCtx) throws HyracksDataException {
         ICCContext ccContext = serviceCtx.getCCContext();
+        INetworkSecurityManager networkSecurityManager = serviceCtx.getControllerService().getNetworkSecurityManager();
         Scheduler scheduler = null;
         try {
             scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
+                    ccContext.getClusterControllerInfo().getClientNetPort(),
+                    networkSecurityManager.getSocketChannelFactory());
         } catch (HyracksException e) {
             throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index b9d68f7..f5bf07b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.hdfs.api.INcCollection;
 import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
@@ -56,16 +57,24 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    /** a list of NCs */
+    /**
+     * a list of NCs
+     */
     private String[] NCs;
 
-    /** a map from ip to NCs */
+    /**
+     * a map from ip to NCs
+     */
     private Map<String, List<String>> ipToNcMapping = new HashMap<>();
 
-    /** a map from the NC name to the index */
+    /**
+     * a map from the NC name to the index
+     */
     private Map<String, Integer> ncNameToIndex = new HashMap<>();
 
-    /** a map from NC name to the NodeControllerInfo */
+    /**
+     * a map from NC name to the NodeControllerInfo
+     */
     private Map<String, NodeControllerInfo> ncNameToNcInfos;
 
     /**
@@ -76,13 +85,15 @@
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
+     * @param ipAddress      IP address
+     * @param port           Port
+     * @param channelFactory Channel Factory
      * @throws HyracksException
      */
 
-    public Scheduler(String ipAddress, int port) throws HyracksException {
+    public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException {
         try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port, channelFactory);
             this.ncNameToNcInfos = hcc.getNodeControllerInfos();
             ClusterTopology topology = hcc.getClusterTopology();
             this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
@@ -96,23 +107,6 @@
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
-            this.ncCollectionBuilder = ncCollectionBuilder;
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    /**
-     * The constructor of the scheduler.
-     *
      * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
index ddf140f..a26a5f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
 
@@ -44,15 +45,14 @@
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
-    public Scheduler(String ipAddress, int port) throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
+    public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException {
+        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port, channelFactory);
     }
 
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
@@ -62,10 +62,8 @@
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @param topology
-     *            the hyracks cluster toplogy
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @param topology        the hyracks cluster toplogy
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
@@ -76,8 +74,7 @@
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17224
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I64f9f4a30a6564aac26e99433ccc695975a22a3b
Gerrit-Change-Number: 17224
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <wa...@gmail.com>
Gerrit-MessageType: newchange