You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/14 05:49:24 UTC

[incubator-iotdb] 01/03: solve hive-connector bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 05b909967b53df33f636988459ce978fa23b52b9
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jul 13 12:38:03 2020 +0800

    solve hive-connector bug
---
 .../apache/iotdb/hadoop/tsfile/TSFInputFormat.java | 23 ++++++++++++++--------
 .../org/apache/iotdb/hive/TSFHiveInputFormat.java  | 11 +++++++----
 .../java/org/apache/iotdb/hive/TsFileSerDe.java    | 16 +++++++++++----
 3 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
index 4808af2..6df9a0e 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.hadoop.tsfile;
 
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -73,7 +75,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
   /**
    * Set the deltaObjectIds which want to be read
    *
-   * @param job hadoop job
+   * @param job   hadoop job
    * @param value the deltaObjectIds will be read
    * @throws TSFHadoopException
    */
@@ -95,8 +97,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
    * Get the deltaObjectIds which want to be read
    *
    * @param configuration
-   * @return List of device, if configuration has been set the deviceIds.
-   * 		   null, if configuration has not been set the deviceIds.
+   * @return List of device, if configuration has been set the deviceIds. null, if configuration has
+   * not been set the deviceIds.
    */
   public static List<String> getReadDeviceIds(Configuration configuration) {
     String deviceIds = configuration.get(READ_DELTAOBJECTS);
@@ -111,7 +113,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
   /**
    * Set the measurementIds which want to be read
    *
-   * @param job hadoop job
+   * @param job   hadoop job
    * @param value the measurementIds will be read
    * @throws TSFHadoopException
    */
@@ -236,11 +238,13 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
 
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
+    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, true);
     List<FileStatus> listFileStatus = super.listStatus(job);
     return new ArrayList<>(getTSFInputSplit(job.getConfiguration(), listFileStatus, logger));
   }
 
-  public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration, List<FileStatus> listFileStatus, Logger logger) throws IOException {
+  public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration,
+      List<FileStatus> listFileStatus, Logger logger) throws IOException {
     BlockLocation[] blockLocations;
     List<TSFInputSplit> splits = new ArrayList<>();
     // get the all file in the directory
@@ -250,6 +254,9 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
       logger.info("The file path is {}", fileStatus.getPath());
       // Get the file path
       Path path = fileStatus.getPath();
+      if (!path.toString().endsWith(TSFILE_SUFFIX)) {
+        continue;
+      }
       // Get the file length
       long length = fileStatus.getLen();
       // Check the file length. if the length is less than 0, return the
@@ -273,8 +280,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
   }
 
   /**
-   * get the TSFInputSplit from tsfMetaData and hdfs block location
-   * information with the filter
+   * get the TSFInputSplit from tsfMetaData and hdfs block location information with the filter
    *
    * @throws IOException
    */
@@ -282,7 +288,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
       throws IOException {
     List<TSFInputSplit> splits = new ArrayList<>();
     for (BlockLocation blockLocation : blockLocations) {
-      splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(), blockLocation.getLength()));
+      splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(),
+          blockLocation.getLength()));
     }
     return splits;
   }
diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
index bb2d5ca..cf7b3e0 100644
--- a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
+++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
@@ -29,8 +29,8 @@ import java.io.IOException;
 import java.util.Arrays;
 
 /**
- * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat}
- * and is customized for Hive to implements JobConfigurable interface.
+ * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat} and is
+ * customized for Hive to implements JobConfigurable interface.
  */
 public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritable> {
 
@@ -39,13 +39,16 @@ public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritabl
 
 
   @Override
-  public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
     return new TSFHiveRecordReader(split, job);
   }
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger).toArray(new InputSplit[0]);
+    job.setBoolean(INPUT_DIR_RECURSIVE, true);
+    return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger)
+        .toArray(new InputSplit[0]);
   }
 
 }
diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
index 9263f2a..35d94e9 100644
--- a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
+++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
@@ -18,6 +18,15 @@
  */
 package org.apache.iotdb.hive;
 
+import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_DELTAOBJECTS;
+import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_MEASUREMENTID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -36,9 +45,6 @@ import org.apache.iotdb.hadoop.tsfile.record.HDFSTSRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.util.*;
-
 public class TsFileSerDe extends AbstractSerDe {
 
   private static final Logger logger = LoggerFactory.getLogger(TsFileSerDe.class);
@@ -63,7 +69,6 @@ public class TsFileSerDe extends AbstractSerDe {
 
     deviceId = tbl.getProperty(DEVICE_ID);
 
-
     if (columnNameProperty == null || columnNameProperty.isEmpty()
     || columnTypeProperty == null || columnTypeProperty.isEmpty()) {
       columnNames = Collections.emptyList();
@@ -80,6 +85,9 @@ public class TsFileSerDe extends AbstractSerDe {
       throw new TsFileSerDeException("len(columnNames) != len(columnTypes)");
     }
 
+    conf.set(READ_DELTAOBJECTS, deviceId);
+    conf.set(READ_MEASUREMENTID, columnNames.get(1));
+
     oi = createObjectInspector();
   }