You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/01/10 07:16:59 UTC

[GitHub] [incubator-inlong] luchunliang opened a new pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

luchunliang opened a new pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129


   ### Title Name: [INLONG-1895][Inlong-Sort-Standalone] Inlong-Sort-Standalone support to sort the events to Hive cluster.
   
   Fixes #1895 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1008606601


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2129](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa65037) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/f51179a30f75a910f3d17e6192195753ed83f31a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f51179a) will **decrease** coverage by `0.04%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2129/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2129      +/-   ##
   ============================================
   - Coverage     12.26%   12.22%   -0.05%     
   + Complexity     1159     1157       -2     
   ============================================
     Files           413      413              
     Lines         35215    35243      +28     
     Branches       5542     5544       +2     
   ============================================
   - Hits           4320     4307      -13     
   - Misses        30125    30173      +48     
   + Partials        770      763       -7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...nlong/tubemq/corebase/policies/FlowCtrlResult.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJlc3VsdC5qYXZh) | `0.00% <0.00%> (-60.00%)` | :arrow_down: |
   | [.../inlong/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `36.66% <0.00%> (-2.23%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `32.96% <0.00%> (-1.11%)` | :arrow_down: |
   | [...he/inlong/tubemq/server/tools/cli/CliConsumer.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci90b29scy9jbGkvQ2xpQ29uc3VtZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/inlong/tubemq/server/tools/cli/CliProducer.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci90b29scy9jbGkvQ2xpUHJvZHVjZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f51179a...aa65037](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783740727



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
##########
@@ -75,19 +75,22 @@ public void addEvent(ProfileEvent event) {
             this.needOutputOvertimeData.set(false);
         }
         // parse
-        String uid = event.getUid();
+        String eventUid = event.getUid();
+        long dispatchTime = event.getRawLogTime() - event.getRawLogTime() % 60000L;

Review comment:
       fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785592198



##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(

Review comment:
       It is sample hive table, not more meaningful.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_0c900035509(

Review comment:
       It is sample hive table, not more meaningful.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(

Review comment:
       It is sample hive table, not more meaningful.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(

Review comment:
       It is sample hive table, not more meaningful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781938738



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.java
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PartitionLeaderElectionRunnable
+ */
+public class PartitionLeaderElectionRunnable implements Runnable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
+    private final HiveSinkContext context;
+    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PartitionLeaderElectionRunnable(HiveSinkContext context) {
+        this.context = context;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        // clear stopped runnable
+        Set<String> uidPartitions = new HashSet<>();
+        for (Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
+            if (entry.getValue().getState() != PartitionState.INIT
+                    && entry.getValue().getState() != PartitionState.CREATING) {
+                uidPartitions.add(entry.getKey());
+            }
+        }
+        uidPartitions.forEach(item -> this.partitionCreateMap.remove(item));
+        // add runnable
+        try (Connection conn = context.getHiveConnection()) {
+            Map<String, HdfsIdConfig> idConfigMap = context.getIdConfigMap();
+            ExecutorService partitionCreatePool = context.getPartitionCreatePool();
+            Statement stat = conn.createStatement();
+            for (Entry<String, HdfsIdConfig> entry : idConfigMap.entrySet()) {
+                if (hasToken(entry.getValue())) {
+                    HdfsIdConfig idConfig = entry.getValue();
+                    // get partition list of table
+                    String tableName = idConfig.getHiveTableName();
+                    ResultSet rs = stat.executeQuery("show partitions " + tableName);
+                    Set<String> partitionSet = new HashSet<>();
+                    while (rs.next()) {
+                        String strPartition = rs.getString(1);
+                        int index = strPartition.indexOf('=');
+                        if (index < 0) {
+                            continue;
+                        }
+                        partitionSet.add(strPartition.substring(index + 1));
+                    }
+                    rs.close();
+                    // close partition
+                    long currentTime = System.currentTimeMillis();
+                    long beginScanTime = currentTime
+                            - 2 * idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    long endScanTime = currentTime - idConfig.getPartitionIntervalMs()
+                            - context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS;
+                    long forceCloseTime = currentTime - idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    for (long pt = beginScanTime; pt < endScanTime; pt += idConfig.getPartitionIntervalMs()) {
+                        String strPartitionValue = idConfig.parsePartitionField(pt);
+                        if (partitionSet.contains(strPartitionValue)) {
+                            continue;
+                        }
+                        boolean isForce = (pt < forceCloseTime);
+                        // force to close partition that has overtimed.
+                        // try to close partition that has no new data and can be closed.
+                        // clear "outtmp" directory.
+                        // merge and copy files in "in" directory to "outtmp" directory.
+                        // rename outtmp file to "out" directory.
+                        // delete file in "in" directory.
+                        // execute the sql of adding partition.
+                        String inlongGroupId = idConfig.getInlongGroupId();
+                        String inlongStreamId = idConfig.getInlongStreamId();
+                        String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+                        String uidPartitionKey = uid + "." + strPartitionValue;
+
+                        PartitionCreateRunnable createTask = this.partitionCreateMap.get(uidPartitionKey);
+                        if (createTask != null) {
+                            createTask.setForce(isForce);
+                            continue;
+                        }
+                        createTask = new PartitionCreateRunnable(context, idConfig,
+                                strPartitionValue, pt, isForce);
+                        this.partitionCreateMap.put(uidPartitionKey, createTask);
+                        partitionCreatePool.execute(createTask);
+
+                    }
+                }
+            }
+            stat.close();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * hasToken
+     * 
+     * @param  idConfig
+     * @return
+     */
+    private boolean hasToken(HdfsIdConfig idConfig) {
+        String containerName = context.getNodeId();
+        String strHdfsPath = context.getHdfsPath();
+        String strIdRootPath = strHdfsPath + idConfig.getIdRootPath();
+        String strTokenFile = strIdRootPath + "/token";
+        FileSystem fs = null;
+        try {
+            // check if token file exists
+            Path hdfsRootPath = new Path(strHdfsPath);
+            fs = hdfsRootPath.getFileSystem(new Configuration());
+            Path tokenPath = new Path(strTokenFile);
+            if (!fs.exists(tokenPath)) {
+                // create token file
+                FSDataOutputStream fsdos = fs.create(tokenPath, true);
+                // write container name to file
+                fsdos.writeChars(containerName);
+                fsdos.flush();
+                fsdos.close();
+                LOG.info("node:{} get id token:inlongGroupId:{},inlongStreamId:{} because token file is not existed.",
+                        containerName, idConfig.getInlongGroupId(), idConfig.getInlongStreamId());
+                return true;
+            } else {
+                // check if last modified time of token file is over
+                FileStatus tokenFileStatus = fs.getFileStatus(tokenPath);
+                long tokenOvertime = context.getTokenOvertimeMinute() * HiveSinkContext.MINUTE_MS;
+                if (System.currentTimeMillis() - tokenFileStatus.getModificationTime() < tokenOvertime
+                        && tokenFileStatus.getLen() < 1024) {

Review comment:
       magic number




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783752584



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007
+                    LOG.error("can not connect to hdfsPath:{},write file:{}", context.getHdfsPath(), strIdRootPath);
+                    this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                    dispatchProfile = this.dispatchQueue.poll();

Review comment:
       The config of this id is bad, so missing the data of this id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781818260



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 

Review comment:
        explain what the protocol consists of here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781891801



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.java
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PartitionLeaderElectionRunnable
+ */
+public class PartitionLeaderElectionRunnable implements Runnable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
+    private final HiveSinkContext context;
+    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PartitionLeaderElectionRunnable(HiveSinkContext context) {
+        this.context = context;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        // clear stopped runnable
+        Set<String> uidPartitions = new HashSet<>();
+        for (Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
+            if (entry.getValue().getState() != PartitionState.INIT
+                    && entry.getValue().getState() != PartitionState.CREATING) {
+                uidPartitions.add(entry.getKey());
+            }
+        }
+        uidPartitions.forEach(item -> this.partitionCreateMap.remove(item));
+        // add runnable
+        try (Connection conn = context.getHiveConnection()) {
+            Map<String, HdfsIdConfig> idConfigMap = context.getIdConfigMap();
+            ExecutorService partitionCreatePool = context.getPartitionCreatePool();
+            Statement stat = conn.createStatement();
+            for (Entry<String, HdfsIdConfig> entry : idConfigMap.entrySet()) {
+                if (hasToken(entry.getValue())) {
+                    HdfsIdConfig idConfig = entry.getValue();
+                    // get partition list of table
+                    String tableName = idConfig.getHiveTableName();
+                    ResultSet rs = stat.executeQuery("show partitions " + tableName);
+                    Set<String> partitionSet = new HashSet<>();
+                    while (rs.next()) {
+                        String strPartition = rs.getString(1);
+                        int index = strPartition.indexOf('=');
+                        if (index < 0) {
+                            continue;
+                        }
+                        partitionSet.add(strPartition.substring(index + 1));
+                    }
+                    rs.close();
+                    // close partition
+                    long currentTime = System.currentTimeMillis();
+                    long beginScanTime = currentTime
+                            - 2 * idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    long endScanTime = currentTime - idConfig.getPartitionIntervalMs()
+                            - context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS;
+                    long forceCloseTime = currentTime - idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;

Review comment:
       The logical suggestion here is to extract a method (97-102)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783751406



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007

Review comment:
       remove 007




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781864270



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007

Review comment:
       This line of comment is not suitable, it is better to delete "007"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781861211



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);

Review comment:
       it`s better to use private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781818051



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HdfsIdConfig.java
##########
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * HdfsIdConfig
+ */
+public class HdfsIdConfig {
+
+    public static final String PATTERN_DAY = "{yyyyMMdd}";
+    public static final String PATTERN_HOUR = "{yyyyMMddHH}";
+    public static final String PATTERN_MINUTE = "{yyyyMMddHHmm}";
+    public static final String REGEX_DAY = "\\{yyyyMMdd\\}";
+    public static final String REGEX_HOUR = "\\{yyyyMMddHH\\}";
+    public static final String REGEX_MINUTE = "\\{yyyyMMddHHmm\\}";
+    public static final long HOUR_MS = 60 * 60 * 1000;

Review comment:
       long type, better end with  “L”




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781890490



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.java
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PartitionLeaderElectionRunnable
+ */
+public class PartitionLeaderElectionRunnable implements Runnable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
+    private final HiveSinkContext context;
+    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PartitionLeaderElectionRunnable(HiveSinkContext context) {
+        this.context = context;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        // clear stopped runnable
+        Set<String> uidPartitions = new HashSet<>();
+        for (Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
+            if (entry.getValue().getState() != PartitionState.INIT
+                    && entry.getValue().getState() != PartitionState.CREATING) {
+                uidPartitions.add(entry.getKey());
+            }
+        }
+        uidPartitions.forEach(item -> this.partitionCreateMap.remove(item));
+        // add runnable
+        try (Connection conn = context.getHiveConnection()) {
+            Map<String, HdfsIdConfig> idConfigMap = context.getIdConfigMap();
+            ExecutorService partitionCreatePool = context.getPartitionCreatePool();
+            Statement stat = conn.createStatement();
+            for (Entry<String, HdfsIdConfig> entry : idConfigMap.entrySet()) {
+                if (hasToken(entry.getValue())) {
+                    HdfsIdConfig idConfig = entry.getValue();
+                    // get partition list of table
+                    String tableName = idConfig.getHiveTableName();
+                    ResultSet rs = stat.executeQuery("show partitions " + tableName);
+                    Set<String> partitionSet = new HashSet<>();
+                    while (rs.next()) {
+                        String strPartition = rs.getString(1);
+                        int index = strPartition.indexOf('=');
+                        if (index < 0) {
+                            continue;
+                        }
+                        partitionSet.add(strPartition.substring(index + 1));
+                    }
+                    rs.close();
+                    // close partition
+                    long currentTime = System.currentTimeMillis();
+                    long beginScanTime = currentTime

Review comment:
       The logical suggestion here is to extract a method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783751164



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007
+                    LOG.error("can not connect to hdfsPath:{},write file:{}", context.getHdfsPath(), strIdRootPath);
+                    this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                    dispatchProfile = this.dispatchQueue.poll();
+                    continue;
+                }
+                this.hdfsIdFileMap.put(strIdRootPath, idFile);
+            }
+            idFile.setModifiedTime(currentTime);
+            // new runnable
+            WriteHdfsFileRunnable writeTask = new WriteHdfsFileRunnable(context, idFile, dispatchProfile);
+            context.getOutputPool().execute(writeTask);
+            dispatchProfile = this.dispatchQueue.poll();
+        }
+        // close overtime file
+        long overtime = currentTime - context.getFileArchiveDelayMinute() * HiveSinkContext.MINUTE_MS;
+        Set<String> overtimePathSet = new HashSet<>();
+        for (Entry<String, HdfsIdFile> entry : this.hdfsIdFileMap.entrySet()) {
+            if (entry.getValue().getModifiedTime() < overtime) {
+                overtimePathSet.add(entry.getKey());
+                entry.getValue().close();
+            }
+        }
+        // remove key
+        overtimePathSet.forEach((item) -> this.hdfsIdFileMap.remove(item));

Review comment:
       Remove Closed file from hdfsIdFileMap, otherwise hdfsIdFileMap will be OOM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783749999



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 

Review comment:
       add protocol content




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783741741



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 
+     * @param  event
+     * @param  idConfig
+     * @return
+     */
+    @Override
+    public byte[] format(ProfileEvent event, HdfsIdConfig idConfig) {
+        long msgTime = event.getRawLogTime();
+        String partitionFieldValue = idConfig.parsePartitionField(msgTime);
+        String msgTimeFieldValue = idConfig.parseMsgTimeField(msgTime);
+        byte[] partitionFieldBytes = partitionFieldValue.getBytes();
+        byte[] msgTimeFieldBytes = msgTimeFieldValue.getBytes();
+        byte[] formatBytes = new byte[partitionFieldBytes.length + 1 + msgTimeFieldBytes.length + 1

Review comment:
       add constant variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1008606601


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2129](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (132c721) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/f51179a30f75a910f3d17e6192195753ed83f31a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f51179a) will **increase** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2129/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2129      +/-   ##
   ============================================
   + Coverage     12.26%   12.28%   +0.01%     
   + Complexity     1159     1157       -2     
   ============================================
     Files           413      413              
     Lines         35215    35215              
     Branches       5542     5542              
   ============================================
   + Hits           4320     4325       +5     
   + Misses        30125    30119       -6     
   - Partials        770      771       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.79%)` | :arrow_down: |
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `55.42% <0.00%> (+4.00%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f51179a...132c721](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783755982



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.java
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PartitionLeaderElectionRunnable
+ */
+public class PartitionLeaderElectionRunnable implements Runnable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
+    private final HiveSinkContext context;
+    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PartitionLeaderElectionRunnable(HiveSinkContext context) {
+        this.context = context;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        // clear stopped runnable
+        Set<String> uidPartitions = new HashSet<>();
+        for (Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
+            if (entry.getValue().getState() != PartitionState.INIT
+                    && entry.getValue().getState() != PartitionState.CREATING) {
+                uidPartitions.add(entry.getKey());
+            }
+        }
+        uidPartitions.forEach(item -> this.partitionCreateMap.remove(item));
+        // add runnable
+        try (Connection conn = context.getHiveConnection()) {
+            Map<String, HdfsIdConfig> idConfigMap = context.getIdConfigMap();
+            ExecutorService partitionCreatePool = context.getPartitionCreatePool();
+            Statement stat = conn.createStatement();
+            for (Entry<String, HdfsIdConfig> entry : idConfigMap.entrySet()) {
+                if (hasToken(entry.getValue())) {
+                    HdfsIdConfig idConfig = entry.getValue();
+                    // get partition list of table
+                    String tableName = idConfig.getHiveTableName();
+                    ResultSet rs = stat.executeQuery("show partitions " + tableName);
+                    Set<String> partitionSet = new HashSet<>();
+                    while (rs.next()) {
+                        String strPartition = rs.getString(1);
+                        int index = strPartition.indexOf('=');
+                        if (index < 0) {
+                            continue;
+                        }
+                        partitionSet.add(strPartition.substring(index + 1));
+                    }
+                    rs.close();
+                    // close partition
+                    long currentTime = System.currentTimeMillis();
+                    long beginScanTime = currentTime
+                            - 2 * idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    long endScanTime = currentTime - idConfig.getPartitionIntervalMs()
+                            - context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS;
+                    long forceCloseTime = currentTime - idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    for (long pt = beginScanTime; pt < endScanTime; pt += idConfig.getPartitionIntervalMs()) {
+                        String strPartitionValue = idConfig.parsePartitionField(pt);
+                        if (partitionSet.contains(strPartitionValue)) {
+                            continue;
+                        }
+                        boolean isForce = (pt < forceCloseTime);
+                        // force to close partition that has overtimed.
+                        // try to close partition that has no new data and can be closed.
+                        // clear "outtmp" directory.
+                        // merge and copy files in "in" directory to "outtmp" directory.
+                        // rename outtmp file to "out" directory.
+                        // delete file in "in" directory.
+                        // execute the sql of adding partition.
+                        String inlongGroupId = idConfig.getInlongGroupId();
+                        String inlongStreamId = idConfig.getInlongStreamId();
+                        String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+                        String uidPartitionKey = uid + "." + strPartitionValue;
+
+                        PartitionCreateRunnable createTask = this.partitionCreateMap.get(uidPartitionKey);
+                        if (createTask != null) {
+                            createTask.setForce(isForce);
+                            continue;
+                        }
+                        createTask = new PartitionCreateRunnable(context, idConfig,
+                                strPartitionValue, pt, isForce);
+                        this.partitionCreateMap.put(uidPartitionKey, createTask);
+                        partitionCreatePool.execute(createTask);
+
+                    }
+                }
+            }
+            stat.close();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * hasToken
+     * 
+     * @param  idConfig
+     * @return
+     */
+    private boolean hasToken(HdfsIdConfig idConfig) {
+        String containerName = context.getNodeId();
+        String strHdfsPath = context.getHdfsPath();
+        String strIdRootPath = strHdfsPath + idConfig.getIdRootPath();
+        String strTokenFile = strIdRootPath + "/token";
+        FileSystem fs = null;
+        try {
+            // check if token file exists
+            Path hdfsRootPath = new Path(strHdfsPath);
+            fs = hdfsRootPath.getFileSystem(new Configuration());
+            Path tokenPath = new Path(strTokenFile);
+            if (!fs.exists(tokenPath)) {
+                // create token file
+                FSDataOutputStream fsdos = fs.create(tokenPath, true);
+                // write container name to file
+                fsdos.writeChars(containerName);
+                fsdos.flush();
+                fsdos.close();
+                LOG.info("node:{} get id token:inlongGroupId:{},inlongStreamId:{} because token file is not existed.",
+                        containerName, idConfig.getInlongGroupId(), idConfig.getInlongStreamId());
+                return true;
+            } else {
+                // check if last modified time of token file is over
+                FileStatus tokenFileStatus = fs.getFileStatus(tokenPath);
+                long tokenOvertime = context.getTokenOvertimeMinute() * HiveSinkContext.MINUTE_MS;
+                if (System.currentTimeMillis() - tokenFileStatus.getModificationTime() < tokenOvertime
+                        && tokenFileStatus.getLen() < 1024) {

Review comment:
       add constant variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1008606601


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2129](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fcfaa49) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/e1235ad7dd7f4880a9d87cbe750e1d9c3f3bf9d3?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e1235ad) will **not change** coverage.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2129/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #2129   +/-   ##
   =========================================
     Coverage     12.58%   12.58%           
     Complexity     1206     1206           
   =========================================
     Files           417      417           
     Lines         35735    35735           
     Branches       5590     5590           
   =========================================
     Hits           4497     4497           
     Misses        30435    30435           
     Partials        803      803           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [e1235ad...fcfaa49](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785598866



##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -51,6 +51,9 @@
         <compiler.source>1.8</compiler.source>
         <compiler.target>1.8</compiler.target>
         <pulsar.version>2.7.2</pulsar.version>
+		<hadoop.version>2.10.1</hadoop.version>
+		<hive.version>2.3.9</hive.version>
+		<fastjson.version>1.2.79</fastjson.version>

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785587272



##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(

Review comment:
       what's the table name meaning? hope it is more meaningful.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -51,6 +51,9 @@
         <compiler.source>1.8</compiler.source>
         <compiler.target>1.8</compiler.target>
         <pulsar.version>2.7.2</pulsar.version>
+		<hadoop.version>2.10.1</hadoop.version>
+		<hive.version>2.3.9</hive.version>
+		<fastjson.version>1.2.79</fastjson.version>

Review comment:
       these three lines should align.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_0c900035509(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -113,6 +116,71 @@
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+		<dependency>

Review comment:
       the dependency part should keep the same format.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool

Review comment:
       remove all useless comments

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HdfsIdConfig.java
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * HdfsIdConfig
+ */
+public class HdfsIdConfig {
+
+    public static final String PATTERN_DAY = "{yyyyMMdd}";
+    public static final String PATTERN_HOUR = "{yyyyMMddHH}";
+    public static final String PATTERN_MINUTE = "{yyyyMMddHHmm}";
+    public static final String REGEX_DAY = "\\{yyyyMMdd\\}";
+    public static final String REGEX_HOUR = "\\{yyyyMMddHH\\}";
+    public static final String REGEX_MINUTE = "\\{yyyyMMddHHmm\\}";
+    public static final long HOUR_MS = 60L * 60 * 1000;
+    public static final int SEPARATOR_LENGTH = 1;
+    private static ThreadLocal<SimpleDateFormat> FORMAT_DAY = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMdd");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_HOUR = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHH");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_MINUTE = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmm");
+        }
+    };
+    // format repository
+    private static ThreadLocal<Map<String, SimpleDateFormat>> FORMAT_REPOSITORY;
+
+    static {
+        FORMAT_REPOSITORY = new ThreadLocal<Map<String, SimpleDateFormat>>() {
+
+            protected Map<String, SimpleDateFormat> initialValue() {
+                return new ConcurrentHashMap<String, SimpleDateFormat>();
+            }
+        };
+    }
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String separator = "|";
+    private long partitionIntervalMs = 60 * 60 * 1000;
+    // path
+    private String idRootPath;
+    private String partitionSubPath;
+    // hive partition
+    private String hiveTableName;
+    private String partitionFieldName = "dt";
+    private String partitionFieldPattern;
+    private String msgTimeFieldPattern;
+    // close partition
+    private long maxPartitionOpenDelayHour = 8;
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * set inlongGroupId
+     * 
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * set inlongStreamId
+     * 
+     * @param inlongStreamId the inlongStreamId to set
+     */
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * get separator
+     * 
+     * @return the separator
+     */
+    public String getSeparator() {
+        return separator;
+    }
+
+    /**
+     * set separator
+     * 
+     * @param separator the separator to set
+     */
+    public void setSeparator(String separator) {
+        this.separator = separator;
+    }
+
+    /**
+     * get partitionIntervalMs
+     * 
+     * @return the partitionIntervalMs
+     */
+    public long getPartitionIntervalMs() {
+        return partitionIntervalMs;
+    }
+
+    /**
+     * set partitionIntervalMs
+     * 
+     * @param partitionIntervalMs the partitionIntervalMs to set
+     */
+    public void setPartitionIntervalMs(long partitionIntervalMs) {
+        this.partitionIntervalMs = partitionIntervalMs;
+    }
+
+    /**
+     * get idRootPath
+     * 
+     * @return the idRootPath
+     */
+    public String getIdRootPath() {
+        return idRootPath;
+    }
+
+    /**
+     * set idRootPath
+     * 
+     * @param idRootPath the idRootPath to set
+     */
+    public void setIdRootPath(String idRootPath) {
+        this.idRootPath = idRootPath;
+    }
+
+    /**
+     * get partitionSubPath
+     * 
+     * @return the partitionSubPath
+     */
+    public String getPartitionSubPath() {
+        return partitionSubPath;
+    }
+
+    /**
+     * set partitionSubPath
+     * 
+     * @param partitionSubPath the partitionSubPath to set
+     */
+    public void setPartitionSubPath(String partitionSubPath) {
+        this.partitionSubPath = partitionSubPath;
+    }
+
+    /**
+     * get partitionFieldName
+     * 
+     * @return the partitionFieldName
+     */
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    /**
+     * set partitionFieldName
+     * 
+     * @param partitionFieldName the partitionFieldName to set
+     */
+    public void setPartitionFieldName(String partitionFieldName) {
+        this.partitionFieldName = partitionFieldName;
+    }
+
+    /**
+     * get partitionFieldPattern
+     * 
+     * @return the partitionFieldPattern
+     */
+    public String getPartitionFieldPattern() {
+        partitionFieldPattern = (partitionFieldPattern == null) ? "yyyyMMddHH" : partitionFieldPattern;
+        return partitionFieldPattern;
+    }
+
+    /**
+     * set partitionFieldPattern
+     * 
+     * @param partitionFieldPattern the partitionFieldPattern to set
+     */
+    public void setPartitionFieldPattern(String partitionFieldPattern) {
+        this.partitionFieldPattern = partitionFieldPattern;
+    }
+
+    /**
+     * get msgTimeFieldPattern
+     * 
+     * @return the msgTimeFieldPattern
+     */
+    public String getMsgTimeFieldPattern() {
+        msgTimeFieldPattern = (msgTimeFieldPattern == null) ? "yyyy-MM-dd HH:mm:ss" : msgTimeFieldPattern;
+        return msgTimeFieldPattern;
+    }
+
+    /**
+     * set msgTimeFieldPattern
+     * 
+     * @param msgTimeFieldPattern the msgTimeFieldPattern to set
+     */
+    public void setMsgTimeFieldPattern(String msgTimeFieldPattern) {
+        this.msgTimeFieldPattern = msgTimeFieldPattern;
+    }
+
+    /**
+     * get maxPartitionOpenDelayHour
+     * 
+     * @return the maxPartitionOpenDelayHour
+     */
+    public long getMaxPartitionOpenDelayHour() {
+        return maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * set maxPartitionOpenDelayHour
+     * 
+     * @param maxPartitionOpenDelayHour the maxPartitionOpenDelayHour to set
+     */
+    public void setMaxPartitionOpenDelayHour(long maxPartitionOpenDelayHour) {
+        this.maxPartitionOpenDelayHour = maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * get hiveTableName
+     * 
+     * @return the hiveTableName
+     */
+    public String getHiveTableName() {
+        return hiveTableName;
+    }
+
+    /**
+     * set hiveTableName
+     * 
+     * @param hiveTableName the hiveTableName to set
+     */
+    public void setHiveTableName(String hiveTableName) {
+        this.hiveTableName = hiveTableName;
+    }
+
+    /**
+     * parsePartitionPath
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionPath(long msgTime) {
+        Date dtDate = new Date(msgTime - msgTime % partitionIntervalMs);
+        String result = partitionSubPath;
+        if (result.indexOf(PATTERN_MINUTE) >= 0) {
+            String strHour = FORMAT_MINUTE.get().format(dtDate);
+            result = result.replaceAll(REGEX_MINUTE, strHour);
+        }
+        if (result.indexOf(PATTERN_HOUR) >= 0) {
+            String strHour = FORMAT_HOUR.get().format(dtDate);
+            result = result.replaceAll(REGEX_HOUR, strHour);
+        }
+        if (result.indexOf(PATTERN_DAY) >= 0) {
+            String strHour = FORMAT_DAY.get().format(dtDate);
+            result = result.replaceAll(REGEX_DAY, strHour);
+        }
+        return idRootPath + result;
+    }
+
+    /**
+     * parsePartitionField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(partitionFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.partitionFieldPattern);
+            FORMAT_REPOSITORY.get().put(partitionFieldPattern, format);
+        }
+        long formatTime = msgTime - msgTime % partitionIntervalMs;
+        return format.format(new Date(formatTime));
+    }
+
+    /**
+     * parseMsgTimeField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parseMsgTimeField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(msgTimeFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.msgTimeFieldPattern);
+            FORMAT_REPOSITORY.get().put(msgTimeFieldPattern, format);
+        }
+        return format.format(new Date(msgTime));
+
+    }
+
+//    public static void main(String[] args) {

Review comment:
       it's better to remove the main code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783740950



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 
+     * @param  event
+     * @param  idConfig
+     * @return
+     */
+    @Override
+    public byte[] format(ProfileEvent event, HdfsIdConfig idConfig) {
+        long msgTime = event.getRawLogTime();
+        String partitionFieldValue = idConfig.parsePartitionField(msgTime);
+        String msgTimeFieldValue = idConfig.parseMsgTimeField(msgTime);
+        byte[] partitionFieldBytes = partitionFieldValue.getBytes();
+        byte[] msgTimeFieldBytes = msgTimeFieldValue.getBytes();
+        byte[] formatBytes = new byte[partitionFieldBytes.length + 1 + msgTimeFieldBytes.length + 1

Review comment:
       add constant variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781813748



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
##########
@@ -75,19 +75,22 @@ public void addEvent(ProfileEvent event) {
             this.needOutputOvertimeData.set(false);
         }
         // parse
-        String uid = event.getUid();
+        String eventUid = event.getUid();
+        long dispatchTime = event.getRawLogTime() - event.getRawLogTime() % 60000L;

Review comment:
       magic number




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781858341



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007
+                    LOG.error("can not connect to hdfsPath:{},write file:{}", context.getHdfsPath(), strIdRootPath);
+                    this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                    dispatchProfile = this.dispatchQueue.poll();
+                    continue;
+                }
+                this.hdfsIdFileMap.put(strIdRootPath, idFile);
+            }
+            idFile.setModifiedTime(currentTime);
+            // new runnable
+            WriteHdfsFileRunnable writeTask = new WriteHdfsFileRunnable(context, idFile, dispatchProfile);
+            context.getOutputPool().execute(writeTask);
+            dispatchProfile = this.dispatchQueue.poll();
+        }
+        // close overtime file
+        long overtime = currentTime - context.getFileArchiveDelayMinute() * HiveSinkContext.MINUTE_MS;
+        Set<String> overtimePathSet = new HashSet<>();
+        for (Entry<String, HdfsIdFile> entry : this.hdfsIdFileMap.entrySet()) {
+            if (entry.getValue().getModifiedTime() < overtime) {
+                overtimePathSet.add(entry.getKey());
+                entry.getValue().close();
+            }
+        }
+        // remove key
+        overtimePathSet.forEach((item) -> this.hdfsIdFileMap.remove(item));

Review comment:
       why not remove it in line 211




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781871589



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007
+                    LOG.error("can not connect to hdfsPath:{},write file:{}", context.getHdfsPath(), strIdRootPath);
+                    this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                    dispatchProfile = this.dispatchQueue.poll();

Review comment:
       Here is another message, will it cause the loss of the previous message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785599137



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool

Review comment:
       These comments is useful.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool

Review comment:
       These comments are useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783990698



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007

Review comment:
       remove 007




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang merged pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
dockerzhang merged pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781816254



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 
+     * @param  event
+     * @param  idConfig
+     * @return
+     */
+    @Override
+    public byte[] format(ProfileEvent event, HdfsIdConfig idConfig) {
+        long msgTime = event.getRawLogTime();
+        String partitionFieldValue = idConfig.parsePartitionField(msgTime);
+        String msgTimeFieldValue = idConfig.parseMsgTimeField(msgTime);
+        byte[] partitionFieldBytes = partitionFieldValue.getBytes();
+        byte[] msgTimeFieldBytes = msgTimeFieldValue.getBytes();
+        byte[] formatBytes = new byte[partitionFieldBytes.length + 1 + msgTimeFieldBytes.length + 1

Review comment:
       ditto, it's hard to figure out the meaning of 1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781865193



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007

Review comment:
       This line of comment is not suitable, it is better to delete "007"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781818003



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/DefaultEventFormatHandler.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DefaultEventFormatHandler
+ */
+public class DefaultEventFormatHandler implements IEventFormatHandler {
+
+    /**
+     * format
+     * 
+     * @param  event
+     * @param  idConfig
+     * @return
+     */
+    @Override
+    public byte[] format(ProfileEvent event, HdfsIdConfig idConfig) {
+        long msgTime = event.getRawLogTime();
+        String partitionFieldValue = idConfig.parsePartitionField(msgTime);
+        String msgTimeFieldValue = idConfig.parseMsgTimeField(msgTime);
+        byte[] partitionFieldBytes = partitionFieldValue.getBytes();
+        byte[] msgTimeFieldBytes = msgTimeFieldValue.getBytes();
+        byte[] formatBytes = new byte[partitionFieldBytes.length + 1 + msgTimeFieldBytes.length + 1

Review comment:
       Better to explain what the protocol consists of ,instead of initializing the array length with multiple +1s per message




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783742019



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HdfsIdConfig.java
##########
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * HdfsIdConfig
+ */
+public class HdfsIdConfig {
+
+    public static final String PATTERN_DAY = "{yyyyMMdd}";
+    public static final String PATTERN_HOUR = "{yyyyMMddHH}";
+    public static final String PATTERN_MINUTE = "{yyyyMMddHHmm}";
+    public static final String REGEX_DAY = "\\{yyyyMMdd\\}";
+    public static final String REGEX_HOUR = "\\{yyyyMMddHH\\}";
+    public static final String REGEX_MINUTE = "\\{yyyyMMddHHmm\\}";
+    public static final long HOUR_MS = 60 * 60 * 1000;

Review comment:
       add "L" end




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1008606601


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2129](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ab99d46) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/f51179a30f75a910f3d17e6192195753ed83f31a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f51179a) will **not change** coverage.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2129/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #2129   +/-   ##
   =========================================
     Coverage     12.26%   12.26%           
     Complexity     1159     1159           
   =========================================
     Files           413      413           
     Lines         35215    35215           
     Branches       5542     5542           
   =========================================
     Hits           4320     4320           
     Misses        30125    30125           
     Partials        770      770           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f51179a...ab99d46](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r784014795



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);

Review comment:
       change to private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r783755234



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.java
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PartitionLeaderElectionRunnable
+ */
+public class PartitionLeaderElectionRunnable implements Runnable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
+    private final HiveSinkContext context;
+    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PartitionLeaderElectionRunnable(HiveSinkContext context) {
+        this.context = context;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        // clear stopped runnable
+        Set<String> uidPartitions = new HashSet<>();
+        for (Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
+            if (entry.getValue().getState() != PartitionState.INIT
+                    && entry.getValue().getState() != PartitionState.CREATING) {
+                uidPartitions.add(entry.getKey());
+            }
+        }
+        uidPartitions.forEach(item -> this.partitionCreateMap.remove(item));
+        // add runnable
+        try (Connection conn = context.getHiveConnection()) {
+            Map<String, HdfsIdConfig> idConfigMap = context.getIdConfigMap();
+            ExecutorService partitionCreatePool = context.getPartitionCreatePool();
+            Statement stat = conn.createStatement();
+            for (Entry<String, HdfsIdConfig> entry : idConfigMap.entrySet()) {
+                if (hasToken(entry.getValue())) {
+                    HdfsIdConfig idConfig = entry.getValue();
+                    // get partition list of table
+                    String tableName = idConfig.getHiveTableName();
+                    ResultSet rs = stat.executeQuery("show partitions " + tableName);
+                    Set<String> partitionSet = new HashSet<>();
+                    while (rs.next()) {
+                        String strPartition = rs.getString(1);
+                        int index = strPartition.indexOf('=');
+                        if (index < 0) {
+                            continue;
+                        }
+                        partitionSet.add(strPartition.substring(index + 1));
+                    }
+                    rs.close();
+                    // close partition
+                    long currentTime = System.currentTimeMillis();
+                    long beginScanTime = currentTime
+                            - 2 * idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;
+                    long endScanTime = currentTime - idConfig.getPartitionIntervalMs()
+                            - context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS;
+                    long forceCloseTime = currentTime - idConfig.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS;

Review comment:
       three time variables need 3 methods, the logical should not be invoked repeatly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r781864270



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool
+    // partition leader election runnable
+    // reload
+    // dispatch
+    private ScheduledExecutorService scheduledPool;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        try {
+            this.context = new HiveSinkContext(getName(), parentContext, getChannel(), this.dispatchQueue);
+            if (getChannel() == null) {
+                LOG.error("channel is null");
+            }
+            this.context.start();
+            this.dispatchManager = new DispatchManager(parentContext, dispatchQueue);
+            this.scheduledPool = Executors.newScheduledThreadPool(2);
+            // dispatch
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // partition leader election runnable
+            this.scheduledPool.scheduleWithFixedDelay(new PartitionLeaderElectionRunnable(context),
+                    0, this.context.getMaxFileOpenDelayMinute() * HiveSinkContext.MINUTE_MS,
+                    TimeUnit.MILLISECONDS);
+            // process
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    writeHdfsFile();
+                }
+            }, this.context.getProcessInterval(), this.context.getProcessInterval(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        try {
+            this.context.close();
+            this.scheduledPool.shutdown();
+            super.stop();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.parentContext = context;
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (!(event instanceof ProfileEvent)) {
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+            //
+            ProfileEvent profileEvent = (ProfileEvent) event;
+            this.dispatchManager.addEvent(profileEvent);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+
+    /**
+     * writeHdfsFile
+     */
+    private void writeHdfsFile() {
+        // write file
+        long currentTime = System.currentTimeMillis();
+        DispatchProfile dispatchProfile = this.dispatchQueue.poll();
+        while (dispatchProfile != null) {
+            String uid = dispatchProfile.getUid();
+            HdfsIdConfig idConfig = context.getIdConfigMap().get(uid);
+            if (idConfig == null) {
+                // monitor 007
+                LOG.error("can not find uid:{},idConfigMap:{}", uid, JSON.toJSONString(context.getIdConfigMap()));
+                this.context.addSendResultMetric(dispatchProfile, uid, false, 0);
+                dispatchProfile = this.dispatchQueue.poll();
+                continue;
+            }
+            String strIdRootPath = idConfig.parsePartitionPath(dispatchProfile.getDispatchTime());
+            HdfsIdFile idFile = this.hdfsIdFileMap.get(strIdRootPath);
+            if (idFile == null) {
+                try {
+                    idFile = new HdfsIdFile(context, idConfig, strIdRootPath);
+                } catch (Exception e) {
+                    // monitor 007

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785598956



##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -113,6 +116,71 @@
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+		<dependency>

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785599309



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HdfsIdConfig.java
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * HdfsIdConfig
+ */
+public class HdfsIdConfig {
+
+    public static final String PATTERN_DAY = "{yyyyMMdd}";
+    public static final String PATTERN_HOUR = "{yyyyMMddHH}";
+    public static final String PATTERN_MINUTE = "{yyyyMMddHHmm}";
+    public static final String REGEX_DAY = "\\{yyyyMMdd\\}";
+    public static final String REGEX_HOUR = "\\{yyyyMMddHH\\}";
+    public static final String REGEX_MINUTE = "\\{yyyyMMddHHmm\\}";
+    public static final long HOUR_MS = 60L * 60 * 1000;
+    public static final int SEPARATOR_LENGTH = 1;
+    private static ThreadLocal<SimpleDateFormat> FORMAT_DAY = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMdd");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_HOUR = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHH");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_MINUTE = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmm");
+        }
+    };
+    // format repository
+    private static ThreadLocal<Map<String, SimpleDateFormat>> FORMAT_REPOSITORY;
+
+    static {
+        FORMAT_REPOSITORY = new ThreadLocal<Map<String, SimpleDateFormat>>() {
+
+            protected Map<String, SimpleDateFormat> initialValue() {
+                return new ConcurrentHashMap<String, SimpleDateFormat>();
+            }
+        };
+    }
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String separator = "|";
+    private long partitionIntervalMs = 60 * 60 * 1000;
+    // path
+    private String idRootPath;
+    private String partitionSubPath;
+    // hive partition
+    private String hiveTableName;
+    private String partitionFieldName = "dt";
+    private String partitionFieldPattern;
+    private String msgTimeFieldPattern;
+    // close partition
+    private long maxPartitionOpenDelayHour = 8;
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * set inlongGroupId
+     * 
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * set inlongStreamId
+     * 
+     * @param inlongStreamId the inlongStreamId to set
+     */
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * get separator
+     * 
+     * @return the separator
+     */
+    public String getSeparator() {
+        return separator;
+    }
+
+    /**
+     * set separator
+     * 
+     * @param separator the separator to set
+     */
+    public void setSeparator(String separator) {
+        this.separator = separator;
+    }
+
+    /**
+     * get partitionIntervalMs
+     * 
+     * @return the partitionIntervalMs
+     */
+    public long getPartitionIntervalMs() {
+        return partitionIntervalMs;
+    }
+
+    /**
+     * set partitionIntervalMs
+     * 
+     * @param partitionIntervalMs the partitionIntervalMs to set
+     */
+    public void setPartitionIntervalMs(long partitionIntervalMs) {
+        this.partitionIntervalMs = partitionIntervalMs;
+    }
+
+    /**
+     * get idRootPath
+     * 
+     * @return the idRootPath
+     */
+    public String getIdRootPath() {
+        return idRootPath;
+    }
+
+    /**
+     * set idRootPath
+     * 
+     * @param idRootPath the idRootPath to set
+     */
+    public void setIdRootPath(String idRootPath) {
+        this.idRootPath = idRootPath;
+    }
+
+    /**
+     * get partitionSubPath
+     * 
+     * @return the partitionSubPath
+     */
+    public String getPartitionSubPath() {
+        return partitionSubPath;
+    }
+
+    /**
+     * set partitionSubPath
+     * 
+     * @param partitionSubPath the partitionSubPath to set
+     */
+    public void setPartitionSubPath(String partitionSubPath) {
+        this.partitionSubPath = partitionSubPath;
+    }
+
+    /**
+     * get partitionFieldName
+     * 
+     * @return the partitionFieldName
+     */
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    /**
+     * set partitionFieldName
+     * 
+     * @param partitionFieldName the partitionFieldName to set
+     */
+    public void setPartitionFieldName(String partitionFieldName) {
+        this.partitionFieldName = partitionFieldName;
+    }
+
+    /**
+     * get partitionFieldPattern
+     * 
+     * @return the partitionFieldPattern
+     */
+    public String getPartitionFieldPattern() {
+        partitionFieldPattern = (partitionFieldPattern == null) ? "yyyyMMddHH" : partitionFieldPattern;
+        return partitionFieldPattern;
+    }
+
+    /**
+     * set partitionFieldPattern
+     * 
+     * @param partitionFieldPattern the partitionFieldPattern to set
+     */
+    public void setPartitionFieldPattern(String partitionFieldPattern) {
+        this.partitionFieldPattern = partitionFieldPattern;
+    }
+
+    /**
+     * get msgTimeFieldPattern
+     * 
+     * @return the msgTimeFieldPattern
+     */
+    public String getMsgTimeFieldPattern() {
+        msgTimeFieldPattern = (msgTimeFieldPattern == null) ? "yyyy-MM-dd HH:mm:ss" : msgTimeFieldPattern;
+        return msgTimeFieldPattern;
+    }
+
+    /**
+     * set msgTimeFieldPattern
+     * 
+     * @param msgTimeFieldPattern the msgTimeFieldPattern to set
+     */
+    public void setMsgTimeFieldPattern(String msgTimeFieldPattern) {
+        this.msgTimeFieldPattern = msgTimeFieldPattern;
+    }
+
+    /**
+     * get maxPartitionOpenDelayHour
+     * 
+     * @return the maxPartitionOpenDelayHour
+     */
+    public long getMaxPartitionOpenDelayHour() {
+        return maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * set maxPartitionOpenDelayHour
+     * 
+     * @param maxPartitionOpenDelayHour the maxPartitionOpenDelayHour to set
+     */
+    public void setMaxPartitionOpenDelayHour(long maxPartitionOpenDelayHour) {
+        this.maxPartitionOpenDelayHour = maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * get hiveTableName
+     * 
+     * @return the hiveTableName
+     */
+    public String getHiveTableName() {
+        return hiveTableName;
+    }
+
+    /**
+     * set hiveTableName
+     * 
+     * @param hiveTableName the hiveTableName to set
+     */
+    public void setHiveTableName(String hiveTableName) {
+        this.hiveTableName = hiveTableName;
+    }
+
+    /**
+     * parsePartitionPath
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionPath(long msgTime) {
+        Date dtDate = new Date(msgTime - msgTime % partitionIntervalMs);
+        String result = partitionSubPath;
+        if (result.indexOf(PATTERN_MINUTE) >= 0) {
+            String strHour = FORMAT_MINUTE.get().format(dtDate);
+            result = result.replaceAll(REGEX_MINUTE, strHour);
+        }
+        if (result.indexOf(PATTERN_HOUR) >= 0) {
+            String strHour = FORMAT_HOUR.get().format(dtDate);
+            result = result.replaceAll(REGEX_HOUR, strHour);
+        }
+        if (result.indexOf(PATTERN_DAY) >= 0) {
+            String strHour = FORMAT_DAY.get().format(dtDate);
+            result = result.replaceAll(REGEX_DAY, strHour);
+        }
+        return idRootPath + result;
+    }
+
+    /**
+     * parsePartitionField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(partitionFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.partitionFieldPattern);
+            FORMAT_REPOSITORY.get().put(partitionFieldPattern, format);
+        }
+        long formatTime = msgTime - msgTime % partitionIntervalMs;
+        return format.format(new Date(formatTime));
+    }
+
+    /**
+     * parseMsgTimeField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parseMsgTimeField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(msgTimeFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.msgTimeFieldPattern);
+            FORMAT_REPOSITORY.get().put(msgTimeFieldPattern, format);
+        }
+        return format.format(new Date(msgTime));
+
+    }
+
+//    public static void main(String[] args) {

Review comment:
       remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1008606601


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2129](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (dc2920b) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/f51179a30f75a910f3d17e6192195753ed83f31a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f51179a) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2129/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2129      +/-   ##
   ============================================
   - Coverage     12.26%   12.25%   -0.01%     
     Complexity     1159     1159              
   ============================================
     Files           413      413              
     Lines         35215    35243      +28     
     Branches       5542     5544       +2     
   ============================================
     Hits           4320     4320              
   - Misses        30125    30153      +28     
     Partials        770      770              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/inlong/tubemq/server/tools/cli/CliConsumer.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci90b29scy9jbGkvQ2xpQ29uc3VtZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/inlong/tubemq/server/tools/cli/CliProducer.java](https://codecov.io/gh/apache/incubator-inlong/pull/2129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci90b29scy9jbGkvQ2xpUHJvZHVjZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f51179a...dc2920b](https://codecov.io/gh/apache/incubator-inlong/pull/2129?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

Posted by GitBox <gi...@apache.org>.
imvan commented on pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#issuecomment-1014430551


   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org