You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/02/22 07:05:39 UTC

[GitHub] [incubator-inlong] healchow commented on a change in pull request #2644: [INLONG-2642][Sort] Use proxy user to write hive

healchow commented on a change in pull request #2644:
URL: https://github.com/apache/incubator-inlong/pull/2644#discussion_r811629189



##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
##########
@@ -91,39 +112,79 @@ public HiveWriter(Configuration configuration, long dataFlowId, HiveSinkInfo hiv
 
     @Override
     public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
-        newPartitions = new ArrayList<>();
-        fileWriterContext = new FileWriterContext();
-        fileWriter.setRuntimeContext(getRuntimeContext());
-        fileWriter.initializeState(functionInitializationContext);
+        doAsWithUGI(proxyUgi, () -> {
+            newPartitions = new ArrayList<>();
+            fileWriterContext = new FileWriterContext();
+            fileWriter.setRuntimeContext(getRuntimeContext());
+            fileWriter.initializeState(functionInitializationContext);
+            return null;
+        });
     }
 
     @Override
     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-        fileWriter.open(parameters);
+//        System.out.println("init:" + proxyUgi);

Review comment:
       Pls, remove unused code.

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
##########
@@ -68,9 +74,24 @@
 
     private transient List<HivePartition> newPartitions;
 
+    private static UserGroupInformation proxyUgi;
+
     public HiveWriter(Configuration configuration, long dataFlowId, HiveSinkInfo hiveSinkInfo) {
         this.configuration = checkNotNull(configuration);
         this.dataFlowId = dataFlowId;
+        UserGroupInformation realUgi = null;
+        try {
+            realUgi = UserGroupInformation.getLoginUser();
+        } catch (IOException e) {
+            e.printStackTrace();

Review comment:
       Pls, use the LOG framework instead of print.

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
##########
@@ -68,9 +74,24 @@
 
     private transient List<HivePartition> newPartitions;
 
+    private static UserGroupInformation proxyUgi;
+
     public HiveWriter(Configuration configuration, long dataFlowId, HiveSinkInfo hiveSinkInfo) {
         this.configuration = checkNotNull(configuration);
         this.dataFlowId = dataFlowId;
+        UserGroupInformation realUgi = null;
+        try {
+            realUgi = UserGroupInformation.getLoginUser();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+//        System.out.println("superUser:" + realUgi.getUserName());
+        String proxyUser = hiveSinkInfo.getHadoopProxyUser();
+//        System.out.println("proxyUser:" + proxyUser);

Review comment:
       Pls, remove unused code.

##########
File path: inlong-sort/sort-single-tenant/pom.xml
##########
@@ -65,6 +65,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <!--        <dependency>-->

Review comment:
       Pls, remove unused code.

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveWriter.java
##########
@@ -91,39 +112,79 @@ public HiveWriter(Configuration configuration, long dataFlowId, HiveSinkInfo hiv
 
     @Override
     public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
-        newPartitions = new ArrayList<>();
-        fileWriterContext = new FileWriterContext();
-        fileWriter.setRuntimeContext(getRuntimeContext());
-        fileWriter.initializeState(functionInitializationContext);
+        doAsWithUGI(proxyUgi, () -> {
+            newPartitions = new ArrayList<>();
+            fileWriterContext = new FileWriterContext();
+            fileWriter.setRuntimeContext(getRuntimeContext());
+            fileWriter.initializeState(functionInitializationContext);
+            return null;
+        });
     }
 
     @Override
     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-        fileWriter.open(parameters);
+//        System.out.println("init:" + proxyUgi);
+        doAsWithUGI(proxyUgi, () -> {
+            fileWriter.open(parameters);
+            return null;
+        });
+
     }
 
     @Override
     public void close() throws Exception {
-        fileWriter.close();
+        doAsWithUGI(proxyUgi, () -> {
+            fileWriter.close();
+            return null;
+        });
     }
 
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
-        fileWriter.snapshotState(functionSnapshotContext);
+        doAsWithUGI(proxyUgi, () -> {
+            fileWriter.snapshotState(functionSnapshotContext);
+            return null;
+        });
     }
 
     @Override
     public void processElement(Row in, Context context, Collector<PartitionCommitInfo> collector) throws Exception {
-        fileWriter.invoke(in, fileWriterContext.setContext(context));
-        if (!newPartitions.isEmpty()) {
-            collector.collect(new PartitionCommitInfo(dataFlowId, new ArrayList<>(newPartitions)));
-            newPartitions.clear();
-        }
+//        System.out.println("processs:" + proxyUgi);
+        doAsWithUGI(proxyUgi, () -> {
+            fileWriter.invoke(in, fileWriterContext.setContext(context));
+            if (!newPartitions.isEmpty()) {
+                collector.collect(new PartitionCommitInfo(dataFlowId, new ArrayList<>(newPartitions)));
+                newPartitions.clear();
+            }
+            return null;
+        });
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        fileWriter.notifyCheckpointComplete(checkpointId);
+        doAsWithUGI(proxyUgi, () -> {
+            fileWriter.notifyCheckpointComplete(checkpointId);
+            return null;
+        });
+    }
+
+    public static <R> R doAsWithUGI(UserGroupInformation ugi, Callable<R> callable) throws IOException {

Review comment:
       Please add Javadoc for the public static method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org