You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/08 23:51:06 UTC
[incubator-pinot] branch master updated: [SegmentGeneratorConfig
Cleanup] Make PinotOutputFormat use table config and schema to create
segments (#5350)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5cbdfcb [SegmentGeneratorConfig Cleanup] Make PinotOutputFormat use table config and schema to create segments (#5350)
5cbdfcb is described below
commit 5cbdfcb4d54339145eb0e9e53391cd6f7166445b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri May 8 16:50:53 2020 -0700
[SegmentGeneratorConfig Cleanup] Make PinotOutputFormat use table config and schema to create segments (#5350)
Refactor PinotOutputFormat related classes to use table config and schema to create segments
---
.../immutable/ImmutableSegmentLoader.java | 2 +-
...ecordSerialization.java => FieldExtractor.java} | 26 ++-
.../pinot/hadoop/io/JsonBasedFieldExtractor.java | 52 ++++++
.../pinot/hadoop/io/JsonPinotOutputFormat.java | 99 -----------
.../apache/pinot/hadoop/io/PinotOutputFormat.java | 148 ++++++-----------
.../org/apache/pinot/hadoop/io/PinotRecord.java | 83 ---------
.../apache/pinot/hadoop/io/PinotRecordWriter.java | 135 ++++++++-------
.../pinot/hadoop/io/PinotOutputFormatTest.java | 185 ++++++++-------------
8 files changed, 252 insertions(+), 478 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index e9dce7e..41e9c89 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -72,7 +72,7 @@ public class ImmutableSegmentLoader {
public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema)
throws Exception {
Preconditions
- .checkArgument(indexDir.isDirectory(), "Index directory: {} does not exist or is not a directory", indexDir);
+ .checkArgument(indexDir.isDirectory(), "Index directory: %s does not exist or is not a directory", indexDir);
// Convert segment version if necessary
// NOTE: this step may modify the segment metadata
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
similarity index 66%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
index d6e27e4..c915e57 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
@@ -19,29 +19,25 @@
package org.apache.pinot.hadoop.io;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.pinot.spi.data.Schema;
-public interface PinotRecordSerialization<T> {
-
- /**
- * init method, called during the {@link PinotRecordWriter()} object creation
- */
- void init(Configuration conf, Schema schema);
+/**
+ * The FieldExtractor extracts fields from the records.
+ * @param <T> Type of the record
+ */
+public interface FieldExtractor<T> {
/**
- * Serialize object to {@link PinotRecord}, called during the {@link
- * PinotRecordWriter#write(Object, Object)}
+ * Initializes the FieldExtractor.
*/
- PinotRecord serialize(T t)
- throws IOException;
+ void init(Configuration conf, Set<String> fields);
/**
- * Deserialize {@link PinotRecord} to Object.
+ * Extracts the fields from the given record.
*/
- T deserialize(PinotRecord record)
+ Map<String, Object> extractFields(T record)
throws IOException;
-
- void close();
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java
new file mode 100644
index 0000000..a204cc9
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Json-based FieldExtractor converts record into {@link JsonNode} and extracts fields from it.
+ */
+public class JsonBasedFieldExtractor implements FieldExtractor<Object> {
+ private Set<String> _fields;
+ private Map<String, Object> _reuse = new HashMap<>();
+
+ @Override
+ public void init(Configuration conf, Set<String> fields) {
+ _fields = fields;
+ _reuse = new HashMap<>(HashUtil.getHashMapCapacity(fields.size()));
+ }
+
+ @Override
+ public Map<String, Object> extractFields(Object record) {
+ _reuse.clear();
+ JsonNode jsonNode = JsonUtils.objectToJsonNode(record);
+ for (String field : _fields) {
+ _reuse.put(field, jsonNode.get(field));
+ }
+ return _reuse;
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java
deleted file mode 100644
index 69c6b65..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.io;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * OutputFormat implementation for Json source
- */
-public class JsonPinotOutputFormat<K, V extends Serializable> extends PinotOutputFormat<K, V> {
- private static final String JSON_READER_CLASS = "json.reader.class";
-
- @Override
- public void configure(Configuration conf) {
- conf.set(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS, JsonPinotRecordSerialization.class.getName());
- }
-
- public static void setJsonReaderClass(JobContext context, Class<?> clazz) {
- context.getConfiguration().set(JSON_READER_CLASS, clazz.getName());
- }
-
- public static String getJsonReaderClass(Configuration conf) {
- if (conf.get(JSON_READER_CLASS) == null) {
- throw new RuntimeException("Json reader class not set");
- }
- return conf.get(JSON_READER_CLASS);
- }
-
- public static class JsonPinotRecordSerialization<T> implements PinotRecordSerialization<T> {
- private Schema _schema;
- private Configuration _conf;
- private PinotRecord _record;
-
- @Override
- public void init(Configuration conf, Schema schema) {
- _schema = schema;
- _conf = conf;
- _record = new PinotRecord(_schema);
- }
-
- @Override
- public PinotRecord serialize(T t) {
- _record.clear();
- JsonNode jsonRecord = JsonUtils.objectToJsonNode(t);
- for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
- String column = fieldSpec.getName();
- _record.putField(column, JsonUtils.extractValue(jsonRecord.get(column), fieldSpec));
- }
- return _record;
- }
-
- @Override
- public T deserialize(PinotRecord record)
- throws IOException {
- ObjectNode jsonRecord = JsonUtils.newObjectNode();
- for (String column : _schema.getColumnNames()) {
- jsonRecord.set(column, JsonUtils.objectToJsonNode(record.getValue(column)));
- }
- return JsonUtils.jsonNodeToObject(jsonRecord, getJsonReaderClass(_conf));
- }
-
- @Override
- public void close() {
- }
-
- private Class<T> getJsonReaderClass(Configuration conf) {
- try {
- return (Class<T>) Class.forName(JsonPinotOutputFormat.getJsonReaderClass(conf));
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Error initialize json reader class", e);
- }
- }
- }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
index abbea40..9b3f8d4 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
@@ -20,150 +20,100 @@ package org.apache.pinot.hadoop.io;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
/**
* Generic Pinot Output Format implementation.
- *
- * TODO: Support star-tree creation
*/
-public class PinotOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
- private final SegmentGeneratorConfig _segmentConfig;
+public class PinotOutputFormat<T> extends FileOutputFormat<NullWritable, T> {
- // Pinot temp directory to create segment.
+ // Temp directory to create segment
public static final String TEMP_SEGMENT_DIR = "pinot.temp.segment.dir";
- // Name of the table
- public static final String TABLE_NAME = "pinot.table.name";
-
- // Name of the segment.
- public static final String SEGMENT_NAME = "pinot.segment_name";
-
- // Name of the time column.
- public static final String TIME_COLUMN_NAME = "pinot.time_column_name";
+ // Serialized table config
+ public static final String TABLE_CONFIG = "pinot.table.config";
- // file containing schema for the data
- public static final String SCHEMA = "pinot.schema.file";
+ // Serialized schema
+ public static final String SCHEMA = "pinot.schema";
- public static final String PINOT_RECORD_SERIALIZATION_CLASS = "pinot.record.serialization.class";
-
- public PinotOutputFormat() {
- _segmentConfig = new SegmentGeneratorConfig();
- }
+ // Class for the field extractor
+ public static final String FIELD_EXTRACTOR_CLASS = "pinot.field.extractor.class";
public static void setTempSegmentDir(Job job, String segmentDir) {
job.getConfiguration().set(PinotOutputFormat.TEMP_SEGMENT_DIR, segmentDir);
}
public static String getTempSegmentDir(JobContext job) {
- return job.getConfiguration().get(PinotOutputFormat.TEMP_SEGMENT_DIR, ".data_" + getTableName(job));
- }
-
- public static void setTableName(Job job, String table) {
- job.getConfiguration().set(PinotOutputFormat.TABLE_NAME, table);
- }
-
- public static String getTableName(JobContext job) {
- String table = job.getConfiguration().get(PinotOutputFormat.TABLE_NAME);
- if (table == null) {
- throw new RuntimeException("pinot table name not set.");
- }
- return table;
- }
-
- public static void setSegmentName(Job job, String segmentName) {
- job.getConfiguration().set(PinotOutputFormat.SEGMENT_NAME, segmentName);
- }
-
- public static String getSegmentName(JobContext context) {
- String segment = context.getConfiguration().get(PinotOutputFormat.SEGMENT_NAME);
- if (segment == null) {
- throw new RuntimeException("pinot segment name not set.");
- }
- return segment;
+ return job.getConfiguration().get(PinotOutputFormat.TEMP_SEGMENT_DIR);
}
- public static void setTimeColumnName(Job job, String timeColumnName) {
- job.getConfiguration().set(PinotOutputFormat.TIME_COLUMN_NAME, timeColumnName);
+ public static void setTableConfig(Job job, TableConfig tableConfig) {
+ job.getConfiguration().set(PinotOutputFormat.TABLE_CONFIG, tableConfig.toJsonString());
}
- public static String getTimeColumnName(JobContext context) {
- return context.getConfiguration().get(PinotOutputFormat.TIME_COLUMN_NAME);
+ public static TableConfig getTableConfig(JobContext job)
+ throws IOException {
+ return JsonUtils.stringToObject(job.getConfiguration().get(PinotOutputFormat.TABLE_CONFIG), TableConfig.class);
}
public static void setSchema(Job job, Schema schema) {
job.getConfiguration().set(PinotOutputFormat.SCHEMA, schema.toSingleLineJsonString());
}
- public static String getSchema(JobContext context) {
- String schemaFile = context.getConfiguration().get(PinotOutputFormat.SCHEMA);
- if (schemaFile == null) {
- throw new RuntimeException("pinot schema file not set");
- }
- return schemaFile;
- }
-
- public static void setDataWriteSupportClass(Job job, Class<? extends PinotRecordSerialization> pinotSerialization) {
- job.getConfiguration().set(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS, pinotSerialization.getName());
- }
-
- public static Class<?> getDataWriteSupportClass(JobContext context) {
- String className = context.getConfiguration().get(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS);
- if (className == null) {
- throw new RuntimeException("pinot data write support class not set");
- }
- try {
- return context.getConfiguration().getClassByName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
+ public static Schema getSchema(JobContext job)
+ throws IOException {
+ return JsonUtils.stringToObject(job.getConfiguration().get(PinotOutputFormat.SCHEMA), Schema.class);
}
- @Override
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ public static SegmentGeneratorConfig getSegmentGeneratorConfig(JobContext job)
throws IOException {
- configure(context.getConfiguration());
- final PinotRecordSerialization dataWriteSupport = getDataWriteSupport(context);
- initSegmentConfig(context);
- Path workDir = getDefaultWorkFile(context, "");
- return new PinotRecordWriter<>(_segmentConfig, context, workDir, dataWriteSupport);
+ TableConfig tableConfig = getTableConfig(job);
+ Schema schema = getSchema(job);
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+ segmentGeneratorConfig.setOutDir(getTempSegmentDir(job) + "/segmentDir");
+ segmentGeneratorConfig.setTableName(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
+ segmentGeneratorConfig.setFormat(FileFormat.JSON);
+ return segmentGeneratorConfig;
}
- /**
- * The {@link #configure(Configuration)} method called before initialize the {@link
- * RecordWriter} Any implementation of {@link PinotOutputFormat} can use it to set additional
- * configuration properties.
- */
- public void configure(Configuration conf) {
-
+ public static void setFieldExtractorClass(Job job, Class<? extends FieldExtractor> fieldExtractorClass) {
+ job.getConfiguration().set(PinotOutputFormat.FIELD_EXTRACTOR_CLASS, fieldExtractorClass.getName());
}
- private PinotRecordSerialization getDataWriteSupport(TaskAttemptContext context) {
+ public static <T> FieldExtractor<T> getFieldExtractor(JobContext job) {
+ Configuration conf = job.getConfiguration();
try {
- return (PinotRecordSerialization) PinotOutputFormat.getDataWriteSupportClass(context).newInstance();
+ //noinspection unchecked
+ return (FieldExtractor<T>) conf.getClassByName(conf.get(PinotOutputFormat.FIELD_EXTRACTOR_CLASS)).newInstance();
} catch (Exception e) {
- throw new RuntimeException("Error initialize data write support class", e);
+ throw new IllegalStateException(
+ "Caught exception while creating instance of field extractor configured with key: " + FIELD_EXTRACTOR_CLASS);
}
}
- private void initSegmentConfig(JobContext context)
+ public static <T> PinotRecordWriter<T> getPinotRecordWriter(TaskAttemptContext job)
+ throws IOException {
+ SegmentGeneratorConfig segmentGeneratorConfig = getSegmentGeneratorConfig(job);
+ FieldExtractor<T> fieldExtractor = getFieldExtractor(job);
+ fieldExtractor.init(job.getConfiguration(), SchemaUtils.extractSourceFields(segmentGeneratorConfig.getSchema()));
+ return new PinotRecordWriter<>(job, segmentGeneratorConfig, fieldExtractor);
+ }
+
+ @Override
+ public PinotRecordWriter<T> getRecordWriter(TaskAttemptContext job)
throws IOException {
- _segmentConfig.setFormat(FileFormat.JSON);
- _segmentConfig.setOutDir(PinotOutputFormat.getTempSegmentDir(context) + "/segmentDir");
- _segmentConfig.setTableName(PinotOutputFormat.getTableName(context));
- _segmentConfig.setSegmentName(PinotOutputFormat.getSegmentName(context));
- Schema schema = Schema.fromString(PinotOutputFormat.getSchema(context));
- _segmentConfig.setSchema(schema);
- _segmentConfig.setTime(PinotOutputFormat.getTimeColumnName(context), schema);
+ return getPinotRecordWriter(job);
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java
deleted file mode 100644
index 475a058..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.io;
-
-import java.io.IOException;
-import java.util.Collection;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-
-/**
- * Container Object for {@link PinotOutputFormat}
- */
-public class PinotRecord extends GenericRow {
-
- private Schema _pinotSchema;
- private Collection<String> _fieldNames;
-
- public PinotRecord(Schema schema) {
- _pinotSchema = schema;
- _fieldNames = _pinotSchema.getColumnNames();
- }
-
- @Override
- public Object getValue(String fieldName) {
- if (!containsField(fieldName)) {
- throw new IllegalArgumentException(String.format("The field name %s not found in the schema", fieldName));
- }
- return super.getValue(fieldName);
- }
-
- @Override
- public void putField(String key, Object value) {
- if (!containsField(key)) {
- throw new IllegalArgumentException(String.format("The field name %s not found in the schema", key));
- }
- super.putField(key, value);
- }
-
- @Override
- public byte[] toBytes()
- throws IOException {
- return super.toBytes();
- }
-
- public Schema getSchema() {
- return _pinotSchema;
- }
-
- public static PinotRecord createOrReuseRecord(PinotRecord record, Schema schema) {
- if (record == null) {
- return new PinotRecord(schema);
- } else {
- record.clear();
- return record;
- }
- }
-
- private boolean containsField(String fieldName) {
- return _fieldNames.contains(fieldName);
- }
-
- @Override
- public String toString() {
- return super.toString();
- }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
index 938bc80..247d431 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
@@ -20,16 +20,18 @@ package org.apache.pinot.hadoop.io;
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,93 +39,88 @@ import org.slf4j.LoggerFactory;
/**
* Basic Single Threaded {@link RecordWriter}
*/
-public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
+public class PinotRecordWriter<T> extends RecordWriter<NullWritable, T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PinotRecordWriter.class);
+ private static final long MAX_FILE_SIZE = 64 * 1000000L;
- private final static Logger LOGGER = LoggerFactory.getLogger(PinotRecordWriter.class);
-
- private final SegmentGeneratorConfig _segmentConfig;
- private final Path _workDir;
- private final String _baseDataDir;
- private PinotRecordSerialization _pinotRecordSerialization;
+ private final SegmentGeneratorConfig _segmentGeneratorConfig;
+ private final FieldExtractor<T> _fieldExtractor;
+ private final File _tempSegmentDir;
+ private final File _dataFileDir;
+ private final File _segmentTarDir;
private final FileHandler _handler;
- private long MAX_FILE_SIZE = 64 * 1000000L;
-
- public PinotRecordWriter(SegmentGeneratorConfig segmentConfig, TaskAttemptContext context, Path workDir,
- PinotRecordSerialization pinotRecordSerialization) {
- _segmentConfig = segmentConfig;
- _workDir = workDir;
- _baseDataDir = PinotOutputFormat.getTempSegmentDir(context) + "/data";
- String filename = PinotOutputFormat.getTableName(context);
- try {
- _handler = new FileHandler(_baseDataDir, filename, ".json", MAX_FILE_SIZE);
- _handler.open(true);
- _pinotRecordSerialization = pinotRecordSerialization;
- _pinotRecordSerialization.init(context.getConfiguration(), segmentConfig.getSchema());
- } catch (Exception e) {
- throw new RuntimeException("Error initialize PinotRecordReader", e);
+ private final FileSystem _fileSystem;
+ private final Path _outputDir;
+
+ public PinotRecordWriter(TaskAttemptContext job, SegmentGeneratorConfig segmentGeneratorConfig,
+ FieldExtractor<T> fieldExtractor)
+ throws IOException {
+ _segmentGeneratorConfig = segmentGeneratorConfig;
+ _fieldExtractor = fieldExtractor;
+
+ _tempSegmentDir = new File(PinotOutputFormat.getTempSegmentDir(job));
+ if (_tempSegmentDir.exists()) {
+ FileUtils.cleanDirectory(_tempSegmentDir);
}
+ _dataFileDir = new File(_tempSegmentDir, "dataFile");
+ FileUtils.forceMkdir(_dataFileDir);
+ _segmentTarDir = new File(_tempSegmentDir, "segmentTar");
+ FileUtils.forceMkdir(_segmentTarDir);
+
+ _handler = new FileHandler(_dataFileDir.getPath(), "data", ".json", MAX_FILE_SIZE);
+ _handler.open(true);
+
+ _fileSystem = FileSystem.get(job.getConfiguration());
+ _outputDir = FileOutputFormat.getOutputPath(job);
}
@Override
- public void write(K key, V value)
- throws IOException, InterruptedException {
- PinotRecord record = _pinotRecordSerialization.serialize(value);
- _handler.write(record.toBytes());
+ public void write(NullWritable key, T value)
+ throws IOException {
+ _handler.write(JsonUtils.objectToBytes(_fieldExtractor.extractFields(value)));
}
@Override
public void close(TaskAttemptContext context)
- throws IOException, InterruptedException {
- _pinotRecordSerialization.close();
+ throws IOException {
_handler.close();
- File dir = new File(_baseDataDir);
- _segmentConfig.setSegmentName(PinotOutputFormat.getSegmentName(context));
- SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
- for (File f : dir.listFiles()) {
- createSegment(f.getPath(), driver);
+
+ File[] dataFiles = _dataFileDir.listFiles();
+ assert dataFiles != null;
+ int numDataFiles = dataFiles.length;
+ for (int i = 0; i < numDataFiles; i++) {
+ createSegment(dataFiles[i], i);
}
- final FileSystem fs = FileSystem.get(new Configuration());
- String localSegmentPath = new File(_segmentConfig.getOutDir(), _segmentConfig.getSegmentName()).getAbsolutePath();
- String localTarPath = getLocalTarFile(PinotOutputFormat.getTempSegmentDir(context));
- LOGGER.info("Trying to tar the segment to: {}", localTarPath);
- TarGzCompressionUtils.createTarGzOfDirectory(localSegmentPath, localTarPath);
- String hdfsTarPath =
- _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
-
- LOGGER.info("*********************************************************************");
- LOGGER.info("Copy from : {} to {}", localTarPath, hdfsTarPath);
- LOGGER.info("*********************************************************************");
- fs.copyFromLocalFile(true, true, new Path(localTarPath), new Path(hdfsTarPath));
- clean(PinotOutputFormat.getTempSegmentDir(context));
+ FileUtils.deleteDirectory(_tempSegmentDir);
}
- private void createSegment(String inputFile, SegmentIndexCreationDriver driver) {
+ private void createSegment(File dataFile, int sequenceId)
+ throws IOException {
+ LOGGER.info("Creating segment from data file: {} of sequence id: {}", dataFile, sequenceId);
+ _segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+ _segmentGeneratorConfig.setSequenceId(sequenceId);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
try {
- _segmentConfig.setInputFilePath(inputFile);
- driver.init(_segmentConfig);
+ driver.init(_segmentGeneratorConfig);
driver.build();
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException("Caught exception while creating segment from data file: " + dataFile);
}
- }
+ String segmentName = driver.getSegmentName();
+ File indexDir = driver.getOutputDirectory();
+ LOGGER.info("Created segment: {} from data file: {} into directory: {}", segmentName, dataFile, indexDir);
- private String getLocalTarFile(String baseDir) {
- String localTarDir = baseDir + "/segmentTar";
- File f = new File(localTarDir);
- if (!f.exists()) {
- f.mkdirs();
- }
- return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
- }
+ File segmentTarFile = new File(_segmentTarDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ LOGGER.info("Tarring segment: {} from directory: {} to: {}", segmentName, indexDir, segmentTarFile);
+ TarGzCompressionUtils.createTarGzOfDirectory(indexDir.getPath(), segmentTarFile.getPath());
- /**
- * delete the temp files
- */
- private void clean(String baseDir) {
- File f = new File(baseDir);
- if (f.exists()) {
- f.delete();
- }
+ Path hdfsSegmentTarPath = new Path(_outputDir, segmentTarFile.getName());
+ LOGGER.info("Copying segment tar file from local: {} to HDFS: {}", segmentTarFile, hdfsSegmentTarPath);
+ _fileSystem.copyFromLocalFile(true, new Path(segmentTarFile.getPath()), hdfsSegmentTarPath);
+
+ LOGGER
+ .info("Finish creating segment: {} from data file: {} of sequence id: {} into HDFS: {}", segmentName, dataFile,
+ sequenceId, hdfsSegmentTarPath);
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
index e0f5dbe..8b3f14b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
@@ -20,150 +20,111 @@ package org.apache.pinot.hadoop.io;
import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
public class PinotOutputFormatTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotOutputFormatTest");
+ private static final String RAW_TABLE_NAME = "testTable";
- private TaskAttemptContext fakeTaskAttemptContext;
- private Job job;
- private Configuration conf;
- private PinotOutputFormat outputFormat;
- private File outputTempDir;
- private String segmentTarPath;
-
- private void init(String indexType)
- throws IOException {
- conf = new Configuration();
- job = Job.getInstance(conf);
- fakeTaskAttemptContext = mock(TaskAttemptContext.class);
- outputFormat = new JsonPinotOutputFormat();
- outputTempDir =
- Files.createTempDirectory(PinotOutputFormatTest.class.getName() + indexType + "_io_output").toFile();
- File workingTempDir =
- Files.createTempDirectory(PinotOutputFormatTest.class.getName() + indexType + "_io_working_dir").toFile();
- // output path
- Path outDir = new Path(outputTempDir.getAbsolutePath());
- PinotOutputFormat.setOutputPath(job, outDir);
- PinotOutputFormat.setTableName(job, "emp");
- PinotOutputFormat.setSegmentName(job, indexType + "segment_one");
- PinotOutputFormat.setTimeColumnName(job, "epochDays");
- PinotOutputFormat.setTempSegmentDir(job, workingTempDir.getAbsolutePath());
-
- Schema schema = Schema.fromString(getSchema());
- PinotOutputFormat.setSchema(job, schema);
- mockTaskAttemptContext(indexType);
- segmentTarPath =
- "_temporary/0/_temporary/attempt_foo_task_" + indexType + "_0123_r_000002_2/part-r-00002/segmentTar";
+ @BeforeClass
+ public void setUp() {
+ FileUtils.deleteQuietly(TEMP_DIR);
}
- private void mockTaskAttemptContext(String indexType) {
- TaskAttemptID fakeTaskId = new TaskAttemptID(new TaskID("foo_task_" + indexType, 123, TaskType.REDUCE, 2), 2);
- when(fakeTaskAttemptContext.getTaskAttemptID()).thenReturn(fakeTaskId);
- when(fakeTaskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ FileUtils.forceDelete(TEMP_DIR);
}
@Test
- public void verifyRawIndex()
- throws Exception {
- runPinotOutputFormatTest("raw");
- }
-
- private void runPinotOutputFormatTest(String indexType)
- throws Exception {
- init(indexType);
- Map<Integer, Emp> inputMap = addTestData();
- validate(inputMap);
- }
-
- private void validate(Map<Integer, Emp> inputMap)
+ public void testPinotOutputFormat()
throws Exception {
- File segmentTarOutput = new File(outputTempDir, segmentTarPath);
- File untarOutput = Files.createTempDirectory(PinotOutputFormatTest.class.getName() + "_segmentUnTar").toFile();
- for (File tarFile : segmentTarOutput.listFiles()) {
- TarGzCompressionUtils.unTar(tarFile, untarOutput);
+ Job job = Job.getInstance();
+ File outputDir = new File(TEMP_DIR, "output");
+ File tempSegmentDir = new File(TEMP_DIR, "tempSegment");
+ PinotOutputFormat.setOutputPath(job, new Path(outputDir.getAbsolutePath()));
+ PinotOutputFormat.setTempSegmentDir(job, tempSegmentDir.getAbsolutePath());
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ PinotOutputFormat.setTableConfig(job, tableConfig);
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("id", FieldSpec.DataType.INT)
+ .addSingleValueDimension("name", FieldSpec.DataType.STRING).addMetric("salary", FieldSpec.DataType.INT)
+ .build();
+ PinotOutputFormat.setSchema(job, schema);
+ PinotOutputFormat.setFieldExtractorClass(job, JsonBasedFieldExtractor.class);
+
+ TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+ when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+ PinotRecordWriter<Employee> pinotRecordWriter = PinotOutputFormat.getPinotRecordWriter(taskAttemptContext);
+ int numRecords = 10;
+ List<Employee> records = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ Employee employee = new Employee(i, "name" + i, 1000 * i);
+ pinotRecordWriter.write(null, employee);
+ records.add(employee);
}
-
- File outputDir = new File(untarOutput, PinotOutputFormat.getSegmentName(fakeTaskAttemptContext));
- RecordReader recordReader = new PinotSegmentRecordReader(outputDir, Schema.fromString(getSchema()), null);
- Map<Integer, GenericRow> resultMap = new HashMap<>();
- while (recordReader.hasNext()) {
+ pinotRecordWriter.close(taskAttemptContext);
+
+ String segmentName = RAW_TABLE_NAME + "_0";
+ File segmentDir = new File(TEMP_DIR, "segment");
+ TarGzCompressionUtils
+ .unTar(new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), segmentDir);
+ File indexDir = new File(segmentDir, segmentName);
+ RecordReader recordReader = new PinotSegmentRecordReader(indexDir, null, null);
+ for (Employee record : records) {
GenericRow row = recordReader.next();
- resultMap.put((Integer) row.getValue("id"), row);
- }
-
- Assert.assertEquals(resultMap.size(), inputMap.size());
- Assert.assertEquals(resultMap.get(8).getValue("name"), inputMap.get(8).name);
- }
-
- private Map<Integer, Emp> addTestData()
- throws IOException, InterruptedException {
- int days = 2000;
- int sal = 20;
- RecordWriter<Object, Emp> writer = outputFormat.getRecordWriter(fakeTaskAttemptContext);
- Map<Integer, Emp> inputMap = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- String name = "name " + i;
- Emp e = new Emp(i, name, days + i, sal + i);
- writer.write(null, e);
- inputMap.put(i, e);
+ assertEquals(row.getValue("id"), record.getId());
+ assertEquals(row.getValue("name"), record.getName());
+ assertEquals(row.getValue("salary"), record.getSalary());
}
- writer.close(fakeTaskAttemptContext);
- return inputMap;
+ assertFalse(recordReader.hasNext());
}
- private static class Emp implements Serializable {
+ public static class Employee {
+ private final int _id;
+ private final String _name;
+ private final int _salary;
- public int id;
- public String name;
- public int epochDays;
- public int salary;
+ private Employee(int id, String name, int salary) {
+ _id = id;
+ _name = name;
+ _salary = salary;
+ }
- public Emp(int id, String name, int epochDays, int salary) {
- this.id = id;
- this.name = name;
- this.epochDays = epochDays;
- this.salary = salary;
+ public int getId() {
+ return _id;
}
- @Override
- public String toString() {
- return "{\"Emp\":{" + " \"id\":\"" + id + "\"" + ", \"name\":\""
- + name + "\"" + ", \"epochDays\":\"" + epochDays + "\""
- + ", \"salary\":\"" + salary + "\"" + "}}";
+ public String getName() {
+ return _name;
}
- }
- private String getSchema() {
- return "{\n" + " \"dimensionFieldSpecs\" : [\n" + " {\n" + " \"name\": \"id\",\n"
- + " \"dataType\" : \"INT\",\n" + " \"delimiter\" : null,\n" + " \"singleValueField\" : true\n"
- + " },\n" + " {\n" + " \"name\": \"name\",\n" + " \"dataType\" : \"STRING\",\n"
- + " \"delimiter\" : null,\n" + " \"singleValueField\" : true\n" + " }\n" + " ],\n"
- + " \"timeFieldSpec\" : {\n" + " \"incomingGranularitySpec\" : {\n" + " \"timeType\" : \"DAYS\",\n"
- + " \"dataType\" : \"INT\",\n" + " \"name\" : \"epochDays\"\n" + " }\n" + " },\n"
- + " \"metricFieldSpecs\" : [\n" + " {\n" + " \"name\" : \"salary\",\n"
- + " \"dataType\" : \"INT\",\n" + " \"delimiter\" : null,\n" + " \"singleValueField\" : true\n"
- + " }\n" + " ],\n" + " \"schemaName\" : \"emp\"\n" + "}";
+ public int getSalary() {
+ return _salary;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org