You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/01/17 01:39:58 UTC

[GitHub] [incubator-inlong] dockerzhang commented on a change in pull request #2129: [INLONG-1895] Inlong-Sort-Standalone support to sort the events to Hive cluster.

dockerzhang commented on a change in pull request #2129:
URL: https://github.com/apache/incubator-inlong/pull/2129#discussion_r785587272



##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(

Review comment:
       what's the table name meaning? hope it is more meaningful.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -51,6 +51,9 @@
         <compiler.source>1.8</compiler.source>
         <compiler.target>1.8</compiler.target>
         <pulsar.version>2.7.2</pulsar.version>
+		<hadoop.version>2.10.1</hadoop.version>
+		<hive.version>2.3.9</hive.version>
+		<fastjson.version>1.2.79</fastjson.version>

Review comment:
       these three lines should align.

##########
File path: inlong-sort-standalone/conf/hive/hive.sql
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE EXTERNAL TABLE t_inlong_v1_0fc00000046(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_03600000045(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    t1 STRING COMMENT 'default ',
+    t2 STRING COMMENT 'default ',
+    t3 STRING COMMENT 'default ',
+    t4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_05100054990(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_09c00014434(
+    tdbank_imp_date STRING COMMENT 'partition fields',
+    ftime STRING COMMENT 'default ',
+    field1 STRING COMMENT 'default ',
+    field2 STRING COMMENT 'default ',
+    field3 STRING COMMENT 'default ',
+    field4 STRING COMMENT 'default '
+)
+PARTITIONED BY (dt STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '\|'
+STORED AS SEQUENCEFILE;
+CREATE EXTERNAL TABLE t_inlong_v1_0c900035509(

Review comment:
       ditto

##########
File path: inlong-sort-standalone/pom.xml
##########
@@ -113,6 +116,71 @@
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+		<dependency>

Review comment:
       the dependency part should keep the same format.

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
##########
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * 
+ * HiveSink
+ */
+public class HiveSink extends AbstractSink implements Configurable {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(HiveSink.class);
+
+    private Context parentContext;
+    private HiveSinkContext context;
+    // message group
+    private DispatchManager dispatchManager;
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
+    // message file
+    private Map<String, HdfsIdFile> hdfsIdFileMap = new ConcurrentHashMap<>();
+    // scheduled thread pool

Review comment:
       remove all useless comments

##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HdfsIdConfig.java
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.hive;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * HdfsIdConfig
+ */
+public class HdfsIdConfig {
+
+    public static final String PATTERN_DAY = "{yyyyMMdd}";
+    public static final String PATTERN_HOUR = "{yyyyMMddHH}";
+    public static final String PATTERN_MINUTE = "{yyyyMMddHHmm}";
+    public static final String REGEX_DAY = "\\{yyyyMMdd\\}";
+    public static final String REGEX_HOUR = "\\{yyyyMMddHH\\}";
+    public static final String REGEX_MINUTE = "\\{yyyyMMddHHmm\\}";
+    public static final long HOUR_MS = 60L * 60 * 1000;
+    public static final int SEPARATOR_LENGTH = 1;
+    private static ThreadLocal<SimpleDateFormat> FORMAT_DAY = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMdd");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_HOUR = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHH");
+        }
+    };
+    private static ThreadLocal<SimpleDateFormat> FORMAT_MINUTE = new ThreadLocal<SimpleDateFormat>() {
+
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmm");
+        }
+    };
+    // format repository
+    private static ThreadLocal<Map<String, SimpleDateFormat>> FORMAT_REPOSITORY;
+
+    static {
+        FORMAT_REPOSITORY = new ThreadLocal<Map<String, SimpleDateFormat>>() {
+
+            protected Map<String, SimpleDateFormat> initialValue() {
+                return new ConcurrentHashMap<String, SimpleDateFormat>();
+            }
+        };
+    }
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String separator = "|";
+    private long partitionIntervalMs = 60 * 60 * 1000;
+    // path
+    private String idRootPath;
+    private String partitionSubPath;
+    // hive partition
+    private String hiveTableName;
+    private String partitionFieldName = "dt";
+    private String partitionFieldPattern;
+    private String msgTimeFieldPattern;
+    // close partition
+    private long maxPartitionOpenDelayHour = 8;
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * set inlongGroupId
+     * 
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * set inlongStreamId
+     * 
+     * @param inlongStreamId the inlongStreamId to set
+     */
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * get separator
+     * 
+     * @return the separator
+     */
+    public String getSeparator() {
+        return separator;
+    }
+
+    /**
+     * set separator
+     * 
+     * @param separator the separator to set
+     */
+    public void setSeparator(String separator) {
+        this.separator = separator;
+    }
+
+    /**
+     * get partitionIntervalMs
+     * 
+     * @return the partitionIntervalMs
+     */
+    public long getPartitionIntervalMs() {
+        return partitionIntervalMs;
+    }
+
+    /**
+     * set partitionIntervalMs
+     * 
+     * @param partitionIntervalMs the partitionIntervalMs to set
+     */
+    public void setPartitionIntervalMs(long partitionIntervalMs) {
+        this.partitionIntervalMs = partitionIntervalMs;
+    }
+
+    /**
+     * get idRootPath
+     * 
+     * @return the idRootPath
+     */
+    public String getIdRootPath() {
+        return idRootPath;
+    }
+
+    /**
+     * set idRootPath
+     * 
+     * @param idRootPath the idRootPath to set
+     */
+    public void setIdRootPath(String idRootPath) {
+        this.idRootPath = idRootPath;
+    }
+
+    /**
+     * get partitionSubPath
+     * 
+     * @return the partitionSubPath
+     */
+    public String getPartitionSubPath() {
+        return partitionSubPath;
+    }
+
+    /**
+     * set partitionSubPath
+     * 
+     * @param partitionSubPath the partitionSubPath to set
+     */
+    public void setPartitionSubPath(String partitionSubPath) {
+        this.partitionSubPath = partitionSubPath;
+    }
+
+    /**
+     * get partitionFieldName
+     * 
+     * @return the partitionFieldName
+     */
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    /**
+     * set partitionFieldName
+     * 
+     * @param partitionFieldName the partitionFieldName to set
+     */
+    public void setPartitionFieldName(String partitionFieldName) {
+        this.partitionFieldName = partitionFieldName;
+    }
+
+    /**
+     * get partitionFieldPattern
+     * 
+     * @return the partitionFieldPattern
+     */
+    public String getPartitionFieldPattern() {
+        partitionFieldPattern = (partitionFieldPattern == null) ? "yyyyMMddHH" : partitionFieldPattern;
+        return partitionFieldPattern;
+    }
+
+    /**
+     * set partitionFieldPattern
+     * 
+     * @param partitionFieldPattern the partitionFieldPattern to set
+     */
+    public void setPartitionFieldPattern(String partitionFieldPattern) {
+        this.partitionFieldPattern = partitionFieldPattern;
+    }
+
+    /**
+     * get msgTimeFieldPattern
+     * 
+     * @return the msgTimeFieldPattern
+     */
+    public String getMsgTimeFieldPattern() {
+        msgTimeFieldPattern = (msgTimeFieldPattern == null) ? "yyyy-MM-dd HH:mm:ss" : msgTimeFieldPattern;
+        return msgTimeFieldPattern;
+    }
+
+    /**
+     * set msgTimeFieldPattern
+     * 
+     * @param msgTimeFieldPattern the msgTimeFieldPattern to set
+     */
+    public void setMsgTimeFieldPattern(String msgTimeFieldPattern) {
+        this.msgTimeFieldPattern = msgTimeFieldPattern;
+    }
+
+    /**
+     * get maxPartitionOpenDelayHour
+     * 
+     * @return the maxPartitionOpenDelayHour
+     */
+    public long getMaxPartitionOpenDelayHour() {
+        return maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * set maxPartitionOpenDelayHour
+     * 
+     * @param maxPartitionOpenDelayHour the maxPartitionOpenDelayHour to set
+     */
+    public void setMaxPartitionOpenDelayHour(long maxPartitionOpenDelayHour) {
+        this.maxPartitionOpenDelayHour = maxPartitionOpenDelayHour;
+    }
+
+    /**
+     * get hiveTableName
+     * 
+     * @return the hiveTableName
+     */
+    public String getHiveTableName() {
+        return hiveTableName;
+    }
+
+    /**
+     * set hiveTableName
+     * 
+     * @param hiveTableName the hiveTableName to set
+     */
+    public void setHiveTableName(String hiveTableName) {
+        this.hiveTableName = hiveTableName;
+    }
+
+    /**
+     * parsePartitionPath
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionPath(long msgTime) {
+        Date dtDate = new Date(msgTime - msgTime % partitionIntervalMs);
+        String result = partitionSubPath;
+        if (result.indexOf(PATTERN_MINUTE) >= 0) {
+            String strHour = FORMAT_MINUTE.get().format(dtDate);
+            result = result.replaceAll(REGEX_MINUTE, strHour);
+        }
+        if (result.indexOf(PATTERN_HOUR) >= 0) {
+            String strHour = FORMAT_HOUR.get().format(dtDate);
+            result = result.replaceAll(REGEX_HOUR, strHour);
+        }
+        if (result.indexOf(PATTERN_DAY) >= 0) {
+            String strHour = FORMAT_DAY.get().format(dtDate);
+            result = result.replaceAll(REGEX_DAY, strHour);
+        }
+        return idRootPath + result;
+    }
+
+    /**
+     * parsePartitionField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parsePartitionField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(partitionFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.partitionFieldPattern);
+            FORMAT_REPOSITORY.get().put(partitionFieldPattern, format);
+        }
+        long formatTime = msgTime - msgTime % partitionIntervalMs;
+        return format.format(new Date(formatTime));
+    }
+
+    /**
+     * parseMsgTimeField
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public String parseMsgTimeField(long msgTime) {
+        SimpleDateFormat format = FORMAT_REPOSITORY.get().get(msgTimeFieldPattern);
+        if (format == null) {
+            format = new SimpleDateFormat(this.msgTimeFieldPattern);
+            FORMAT_REPOSITORY.get().put(msgTimeFieldPattern, format);
+        }
+        return format.format(new Date(msgTime));
+
+    }
+
+//    public static void main(String[] args) {

Review comment:
       it's better to remove the main code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org