You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/14 05:49:24 UTC
[incubator-iotdb] 01/03: solve hive-connector bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 05b909967b53df33f636988459ce978fa23b52b9
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jul 13 12:38:03 2020 +0800
solve hive-connector bug
---
.../apache/iotdb/hadoop/tsfile/TSFInputFormat.java | 23 ++++++++++++++--------
.../org/apache/iotdb/hive/TSFHiveInputFormat.java | 11 +++++++----
.../java/org/apache/iotdb/hive/TsFileSerDe.java | 16 +++++++++++----
3 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
index 4808af2..6df9a0e 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.hadoop.tsfile;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -73,7 +75,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
/**
* Set the deltaObjectIds which want to be read
*
- * @param job hadoop job
+ * @param job hadoop job
* @param value the deltaObjectIds will be read
* @throws TSFHadoopException
*/
@@ -95,8 +97,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
* Get the deltaObjectIds which want to be read
*
* @param configuration
- * @return List of device, if configuration has been set the deviceIds.
- * null, if configuration has not been set the deviceIds.
+ * @return List of device, if configuration has been set the deviceIds. null, if configuration has
+ * not been set the deviceIds.
*/
public static List<String> getReadDeviceIds(Configuration configuration) {
String deviceIds = configuration.get(READ_DELTAOBJECTS);
@@ -111,7 +113,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
/**
* Set the measurementIds which want to be read
*
- * @param job hadoop job
+ * @param job hadoop job
* @param value the measurementIds will be read
* @throws TSFHadoopException
*/
@@ -236,11 +238,13 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
+ job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, true);
List<FileStatus> listFileStatus = super.listStatus(job);
return new ArrayList<>(getTSFInputSplit(job.getConfiguration(), listFileStatus, logger));
}
- public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration, List<FileStatus> listFileStatus, Logger logger) throws IOException {
+ public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration,
+ List<FileStatus> listFileStatus, Logger logger) throws IOException {
BlockLocation[] blockLocations;
List<TSFInputSplit> splits = new ArrayList<>();
// get the all file in the directory
@@ -250,6 +254,9 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
logger.info("The file path is {}", fileStatus.getPath());
// Get the file path
Path path = fileStatus.getPath();
+ if (!path.toString().endsWith(TSFILE_SUFFIX)) {
+ continue;
+ }
// Get the file length
long length = fileStatus.getLen();
// Check the file length. if the length is less than 0, return the
@@ -273,8 +280,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
}
/**
- * get the TSFInputSplit from tsfMetaData and hdfs block location
- * information with the filter
+ * get the TSFInputSplit from tsfMetaData and hdfs block location information with the filter
*
* @throws IOException
*/
@@ -282,7 +288,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
throws IOException {
List<TSFInputSplit> splits = new ArrayList<>();
for (BlockLocation blockLocation : blockLocations) {
- splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(), blockLocation.getLength()));
+ splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(),
+ blockLocation.getLength()));
}
return splits;
}
diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
index bb2d5ca..cf7b3e0 100644
--- a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
+++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java
@@ -29,8 +29,8 @@ import java.io.IOException;
import java.util.Arrays;
/**
- * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat}
- * and is customized for Hive to implements JobConfigurable interface.
+ * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat} and is
+ * customized for Hive to implements JobConfigurable interface.
*/
public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritable> {
@@ -39,13 +39,16 @@ public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritabl
@Override
- public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
return new TSFHiveRecordReader(split, job);
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger).toArray(new InputSplit[0]);
+ job.setBoolean(INPUT_DIR_RECURSIVE, true);
+ return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger)
+ .toArray(new InputSplit[0]);
}
}
diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
index 9263f2a..35d94e9 100644
--- a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
+++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
@@ -18,6 +18,15 @@
*/
package org.apache.iotdb.hive;
+import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_DELTAOBJECTS;
+import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_MEASUREMENTID;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -36,9 +45,6 @@ import org.apache.iotdb.hadoop.tsfile.record.HDFSTSRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.util.*;
-
public class TsFileSerDe extends AbstractSerDe {
private static final Logger logger = LoggerFactory.getLogger(TsFileSerDe.class);
@@ -63,7 +69,6 @@ public class TsFileSerDe extends AbstractSerDe {
deviceId = tbl.getProperty(DEVICE_ID);
-
if (columnNameProperty == null || columnNameProperty.isEmpty()
|| columnTypeProperty == null || columnTypeProperty.isEmpty()) {
columnNames = Collections.emptyList();
@@ -80,6 +85,9 @@ public class TsFileSerDe extends AbstractSerDe {
throw new TsFileSerDeException("len(columnNames) != len(columnTypes)");
}
+ conf.set(READ_DELTAOBJECTS, deviceId);
+ conf.set(READ_MEASUREMENTID, columnNames.get(1));
+
oi = createObjectInspector();
}