You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/08 23:51:06 UTC

[incubator-pinot] branch master updated: [SegmentGeneratorConfig Cleanup] Make PinotOutputFormat use table config and schema to create segments (#5350)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cbdfcb  [SegmentGeneratorConfig Cleanup] Make PinotOutputFormat use table config and schema to create segments (#5350)
5cbdfcb is described below

commit 5cbdfcb4d54339145eb0e9e53391cd6f7166445b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri May 8 16:50:53 2020 -0700

    [SegmentGeneratorConfig Cleanup] Make PinotOutputFormat use table config and schema to create segments (#5350)
    
    Refactor PinotOutputFormat related classes to use table config and schema to create segments
---
 .../immutable/ImmutableSegmentLoader.java          |   2 +-
 ...ecordSerialization.java => FieldExtractor.java} |  26 ++-
 .../pinot/hadoop/io/JsonBasedFieldExtractor.java   |  52 ++++++
 .../pinot/hadoop/io/JsonPinotOutputFormat.java     |  99 -----------
 .../apache/pinot/hadoop/io/PinotOutputFormat.java  | 148 ++++++-----------
 .../org/apache/pinot/hadoop/io/PinotRecord.java    |  83 ---------
 .../apache/pinot/hadoop/io/PinotRecordWriter.java  | 135 ++++++++-------
 .../pinot/hadoop/io/PinotOutputFormatTest.java     | 185 ++++++++-------------
 8 files changed, 252 insertions(+), 478 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index e9dce7e..41e9c89 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -72,7 +72,7 @@ public class ImmutableSegmentLoader {
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema)
       throws Exception {
     Preconditions
-        .checkArgument(indexDir.isDirectory(), "Index directory: {} does not exist or is not a directory", indexDir);
+        .checkArgument(indexDir.isDirectory(), "Index directory: %s does not exist or is not a directory", indexDir);
 
     // Convert segment version if necessary
     // NOTE: this step may modify the segment metadata
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
similarity index 66%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
index d6e27e4..c915e57 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordSerialization.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/FieldExtractor.java
@@ -19,29 +19,25 @@
 package org.apache.pinot.hadoop.io;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pinot.spi.data.Schema;
 
 
-public interface PinotRecordSerialization<T> {
-
-  /**
-   * init method, called during the {@link PinotRecordWriter()} object creation
-   */
-  void init(Configuration conf, Schema schema);
+/**
+ * The FieldExtractor extracts fields from the records.
+ * @param <T> Type of the record
+ */
+public interface FieldExtractor<T> {
 
   /**
-   * Serialize object to {@link PinotRecord}, called during the {@link
-   * PinotRecordWriter#write(Object, Object)}
+   * Initializes the FieldExtractor.
    */
-  PinotRecord serialize(T t)
-      throws IOException;
+  void init(Configuration conf, Set<String> fields);
 
   /**
-   * Deserialize {@link PinotRecord} to Object.
+   * Extracts the fields from the given record.
    */
-  T deserialize(PinotRecord record)
+  Map<String, Object> extractFields(T record)
       throws IOException;
-
-  void close();
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java
new file mode 100644
index 0000000..a204cc9
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonBasedFieldExtractor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Json-based FieldExtractor converts record into {@link JsonNode} and extracts fields from it.
+ */
+public class JsonBasedFieldExtractor implements FieldExtractor<Object> {
+  private Set<String> _fields;
+  private Map<String, Object> _reuse = new HashMap<>();
+
+  @Override
+  public void init(Configuration conf, Set<String> fields) {
+    _fields = fields;
+    _reuse = new HashMap<>(HashUtil.getHashMapCapacity(fields.size()));
+  }
+
+  @Override
+  public Map<String, Object> extractFields(Object record) {
+    _reuse.clear();
+    JsonNode jsonNode = JsonUtils.objectToJsonNode(record);
+    for (String field : _fields) {
+      _reuse.put(field, jsonNode.get(field));
+    }
+    return _reuse;
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java
deleted file mode 100644
index 69c6b65..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/JsonPinotOutputFormat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.io;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * OutputFormat implementation for Json source
- */
-public class JsonPinotOutputFormat<K, V extends Serializable> extends PinotOutputFormat<K, V> {
-  private static final String JSON_READER_CLASS = "json.reader.class";
-
-  @Override
-  public void configure(Configuration conf) {
-    conf.set(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS, JsonPinotRecordSerialization.class.getName());
-  }
-
-  public static void setJsonReaderClass(JobContext context, Class<?> clazz) {
-    context.getConfiguration().set(JSON_READER_CLASS, clazz.getName());
-  }
-
-  public static String getJsonReaderClass(Configuration conf) {
-    if (conf.get(JSON_READER_CLASS) == null) {
-      throw new RuntimeException("Json reader class not set");
-    }
-    return conf.get(JSON_READER_CLASS);
-  }
-
-  public static class JsonPinotRecordSerialization<T> implements PinotRecordSerialization<T> {
-    private Schema _schema;
-    private Configuration _conf;
-    private PinotRecord _record;
-
-    @Override
-    public void init(Configuration conf, Schema schema) {
-      _schema = schema;
-      _conf = conf;
-      _record = new PinotRecord(_schema);
-    }
-
-    @Override
-    public PinotRecord serialize(T t) {
-      _record.clear();
-      JsonNode jsonRecord = JsonUtils.objectToJsonNode(t);
-      for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
-        String column = fieldSpec.getName();
-        _record.putField(column, JsonUtils.extractValue(jsonRecord.get(column), fieldSpec));
-      }
-      return _record;
-    }
-
-    @Override
-    public T deserialize(PinotRecord record)
-        throws IOException {
-      ObjectNode jsonRecord = JsonUtils.newObjectNode();
-      for (String column : _schema.getColumnNames()) {
-        jsonRecord.set(column, JsonUtils.objectToJsonNode(record.getValue(column)));
-      }
-      return JsonUtils.jsonNodeToObject(jsonRecord, getJsonReaderClass(_conf));
-    }
-
-    @Override
-    public void close() {
-    }
-
-    private Class<T> getJsonReaderClass(Configuration conf) {
-      try {
-        return (Class<T>) Class.forName(JsonPinotOutputFormat.getJsonReaderClass(conf));
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("Error initialize json reader class", e);
-      }
-    }
-  }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
index abbea40..9b3f8d4 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotOutputFormat.java
@@ -20,150 +20,100 @@ package org.apache.pinot.hadoop.io;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 /**
  * Generic Pinot Output Format implementation.
- *
- * TODO: Support star-tree creation
  */
-public class PinotOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
-  private final SegmentGeneratorConfig _segmentConfig;
+public class PinotOutputFormat<T> extends FileOutputFormat<NullWritable, T> {
 
-  // Pinot temp directory to create segment.
+  // Temp directory to create segment
   public static final String TEMP_SEGMENT_DIR = "pinot.temp.segment.dir";
 
-  // Name of the table
-  public static final String TABLE_NAME = "pinot.table.name";
-
-  // Name of the segment.
-  public static final String SEGMENT_NAME = "pinot.segment_name";
-
-  // Name of the time column.
-  public static final String TIME_COLUMN_NAME = "pinot.time_column_name";
+  // Serialized table config
+  public static final String TABLE_CONFIG = "pinot.table.config";
 
-  // file containing schema for the data
-  public static final String SCHEMA = "pinot.schema.file";
+  // Serialized schema
+  public static final String SCHEMA = "pinot.schema";
 
-  public static final String PINOT_RECORD_SERIALIZATION_CLASS = "pinot.record.serialization.class";
-
-  public PinotOutputFormat() {
-    _segmentConfig = new SegmentGeneratorConfig();
-  }
+  // Class for the field extractor
+  public static final String FIELD_EXTRACTOR_CLASS = "pinot.field.extractor.class";
 
   public static void setTempSegmentDir(Job job, String segmentDir) {
     job.getConfiguration().set(PinotOutputFormat.TEMP_SEGMENT_DIR, segmentDir);
   }
 
   public static String getTempSegmentDir(JobContext job) {
-    return job.getConfiguration().get(PinotOutputFormat.TEMP_SEGMENT_DIR, ".data_" + getTableName(job));
-  }
-
-  public static void setTableName(Job job, String table) {
-    job.getConfiguration().set(PinotOutputFormat.TABLE_NAME, table);
-  }
-
-  public static String getTableName(JobContext job) {
-    String table = job.getConfiguration().get(PinotOutputFormat.TABLE_NAME);
-    if (table == null) {
-      throw new RuntimeException("pinot table name not set.");
-    }
-    return table;
-  }
-
-  public static void setSegmentName(Job job, String segmentName) {
-    job.getConfiguration().set(PinotOutputFormat.SEGMENT_NAME, segmentName);
-  }
-
-  public static String getSegmentName(JobContext context) {
-    String segment = context.getConfiguration().get(PinotOutputFormat.SEGMENT_NAME);
-    if (segment == null) {
-      throw new RuntimeException("pinot segment name not set.");
-    }
-    return segment;
+    return job.getConfiguration().get(PinotOutputFormat.TEMP_SEGMENT_DIR);
   }
 
-  public static void setTimeColumnName(Job job, String timeColumnName) {
-    job.getConfiguration().set(PinotOutputFormat.TIME_COLUMN_NAME, timeColumnName);
+  public static void setTableConfig(Job job, TableConfig tableConfig) {
+    job.getConfiguration().set(PinotOutputFormat.TABLE_CONFIG, tableConfig.toJsonString());
   }
 
-  public static String getTimeColumnName(JobContext context) {
-    return context.getConfiguration().get(PinotOutputFormat.TIME_COLUMN_NAME);
+  public static TableConfig getTableConfig(JobContext job)
+      throws IOException {
+    return JsonUtils.stringToObject(job.getConfiguration().get(PinotOutputFormat.TABLE_CONFIG), TableConfig.class);
   }
 
   public static void setSchema(Job job, Schema schema) {
     job.getConfiguration().set(PinotOutputFormat.SCHEMA, schema.toSingleLineJsonString());
   }
 
-  public static String getSchema(JobContext context) {
-    String schemaFile = context.getConfiguration().get(PinotOutputFormat.SCHEMA);
-    if (schemaFile == null) {
-      throw new RuntimeException("pinot schema file not set");
-    }
-    return schemaFile;
-  }
-
-  public static void setDataWriteSupportClass(Job job, Class<? extends PinotRecordSerialization> pinotSerialization) {
-    job.getConfiguration().set(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS, pinotSerialization.getName());
-  }
-
-  public static Class<?> getDataWriteSupportClass(JobContext context) {
-    String className = context.getConfiguration().get(PinotOutputFormat.PINOT_RECORD_SERIALIZATION_CLASS);
-    if (className == null) {
-      throw new RuntimeException("pinot data write support class not set");
-    }
-    try {
-      return context.getConfiguration().getClassByName(className);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
+  public static Schema getSchema(JobContext job)
+      throws IOException {
+    return JsonUtils.stringToObject(job.getConfiguration().get(PinotOutputFormat.SCHEMA), Schema.class);
   }
 
-  @Override
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+  public static SegmentGeneratorConfig getSegmentGeneratorConfig(JobContext job)
       throws IOException {
-    configure(context.getConfiguration());
-    final PinotRecordSerialization dataWriteSupport = getDataWriteSupport(context);
-    initSegmentConfig(context);
-    Path workDir = getDefaultWorkFile(context, "");
-    return new PinotRecordWriter<>(_segmentConfig, context, workDir, dataWriteSupport);
+    TableConfig tableConfig = getTableConfig(job);
+    Schema schema = getSchema(job);
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setOutDir(getTempSegmentDir(job) + "/segmentDir");
+    segmentGeneratorConfig.setTableName(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
+    segmentGeneratorConfig.setFormat(FileFormat.JSON);
+    return segmentGeneratorConfig;
   }
 
-  /**
-   * The {@link #configure(Configuration)} method called before initialize the  {@link
-   * RecordWriter} Any implementation of {@link PinotOutputFormat} can use it to set additional
-   * configuration properties.
-   */
-  public void configure(Configuration conf) {
-
+  public static void setFieldExtractorClass(Job job, Class<? extends FieldExtractor> fieldExtractorClass) {
+    job.getConfiguration().set(PinotOutputFormat.FIELD_EXTRACTOR_CLASS, fieldExtractorClass.getName());
   }
 
-  private PinotRecordSerialization getDataWriteSupport(TaskAttemptContext context) {
+  public static <T> FieldExtractor<T> getFieldExtractor(JobContext job) {
+    Configuration conf = job.getConfiguration();
     try {
-      return (PinotRecordSerialization) PinotOutputFormat.getDataWriteSupportClass(context).newInstance();
+      //noinspection unchecked
+      return (FieldExtractor<T>) conf.getClassByName(conf.get(PinotOutputFormat.FIELD_EXTRACTOR_CLASS)).newInstance();
     } catch (Exception e) {
-      throw new RuntimeException("Error initialize data write support class", e);
+      throw new IllegalStateException(
+          "Caught exception while creating instance of field extractor configured with key: " + FIELD_EXTRACTOR_CLASS);
     }
   }
 
-  private void initSegmentConfig(JobContext context)
+  public static <T> PinotRecordWriter<T> getPinotRecordWriter(TaskAttemptContext job)
+      throws IOException {
+    SegmentGeneratorConfig segmentGeneratorConfig = getSegmentGeneratorConfig(job);
+    FieldExtractor<T> fieldExtractor = getFieldExtractor(job);
+    fieldExtractor.init(job.getConfiguration(), SchemaUtils.extractSourceFields(segmentGeneratorConfig.getSchema()));
+    return new PinotRecordWriter<>(job, segmentGeneratorConfig, fieldExtractor);
+  }
+
+  @Override
+  public PinotRecordWriter<T> getRecordWriter(TaskAttemptContext job)
       throws IOException {
-    _segmentConfig.setFormat(FileFormat.JSON);
-    _segmentConfig.setOutDir(PinotOutputFormat.getTempSegmentDir(context) + "/segmentDir");
-    _segmentConfig.setTableName(PinotOutputFormat.getTableName(context));
-    _segmentConfig.setSegmentName(PinotOutputFormat.getSegmentName(context));
-    Schema schema = Schema.fromString(PinotOutputFormat.getSchema(context));
-    _segmentConfig.setSchema(schema);
-    _segmentConfig.setTime(PinotOutputFormat.getTimeColumnName(context), schema);
+    return getPinotRecordWriter(job);
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java
deleted file mode 100644
index 475a058..0000000
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecord.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.io;
-
-import java.io.IOException;
-import java.util.Collection;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-
-/**
- * Container Object for {@link PinotOutputFormat}
- */
-public class PinotRecord extends GenericRow {
-
-  private Schema _pinotSchema;
-  private Collection<String> _fieldNames;
-
-  public PinotRecord(Schema schema) {
-    _pinotSchema = schema;
-    _fieldNames = _pinotSchema.getColumnNames();
-  }
-
-  @Override
-  public Object getValue(String fieldName) {
-    if (!containsField(fieldName)) {
-      throw new IllegalArgumentException(String.format("The field name %s not found in the schema", fieldName));
-    }
-    return super.getValue(fieldName);
-  }
-
-  @Override
-  public void putField(String key, Object value) {
-    if (!containsField(key)) {
-      throw new IllegalArgumentException(String.format("The field name %s not found in the schema", key));
-    }
-    super.putField(key, value);
-  }
-
-  @Override
-  public byte[] toBytes()
-      throws IOException {
-    return super.toBytes();
-  }
-
-  public Schema getSchema() {
-    return _pinotSchema;
-  }
-
-  public static PinotRecord createOrReuseRecord(PinotRecord record, Schema schema) {
-    if (record == null) {
-      return new PinotRecord(schema);
-    } else {
-      record.clear();
-      return record;
-    }
-  }
-
-  private boolean containsField(String fieldName) {
-    return _fieldNames.contains(fieldName);
-  }
-
-  @Override
-  public String toString() {
-    return super.toString();
-  }
-}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
index 938bc80..247d431 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/PinotRecordWriter.java
@@ -20,16 +20,18 @@ package org.apache.pinot.hadoop.io;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,93 +39,88 @@ import org.slf4j.LoggerFactory;
 /**
  * Basic Single Threaded {@link RecordWriter}
  */
-public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
+public class PinotRecordWriter<T> extends RecordWriter<NullWritable, T> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRecordWriter.class);
+  private static final long MAX_FILE_SIZE = 64 * 1000000L;
 
-  private final static Logger LOGGER = LoggerFactory.getLogger(PinotRecordWriter.class);
-
-  private final SegmentGeneratorConfig _segmentConfig;
-  private final Path _workDir;
-  private final String _baseDataDir;
-  private PinotRecordSerialization _pinotRecordSerialization;
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final FieldExtractor<T> _fieldExtractor;
+  private final File _tempSegmentDir;
+  private final File _dataFileDir;
+  private final File _segmentTarDir;
   private final FileHandler _handler;
-  private long MAX_FILE_SIZE = 64 * 1000000L;
-
-  public PinotRecordWriter(SegmentGeneratorConfig segmentConfig, TaskAttemptContext context, Path workDir,
-      PinotRecordSerialization pinotRecordSerialization) {
-    _segmentConfig = segmentConfig;
-    _workDir = workDir;
-    _baseDataDir = PinotOutputFormat.getTempSegmentDir(context) + "/data";
-    String filename = PinotOutputFormat.getTableName(context);
-    try {
-      _handler = new FileHandler(_baseDataDir, filename, ".json", MAX_FILE_SIZE);
-      _handler.open(true);
-      _pinotRecordSerialization = pinotRecordSerialization;
-      _pinotRecordSerialization.init(context.getConfiguration(), segmentConfig.getSchema());
-    } catch (Exception e) {
-      throw new RuntimeException("Error initialize PinotRecordReader", e);
+  private final FileSystem _fileSystem;
+  private final Path _outputDir;
+
+  public PinotRecordWriter(TaskAttemptContext job, SegmentGeneratorConfig segmentGeneratorConfig,
+      FieldExtractor<T> fieldExtractor)
+      throws IOException {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _fieldExtractor = fieldExtractor;
+
+    _tempSegmentDir = new File(PinotOutputFormat.getTempSegmentDir(job));
+    if (_tempSegmentDir.exists()) {
+      FileUtils.cleanDirectory(_tempSegmentDir);
     }
+    _dataFileDir = new File(_tempSegmentDir, "dataFile");
+    FileUtils.forceMkdir(_dataFileDir);
+    _segmentTarDir = new File(_tempSegmentDir, "segmentTar");
+    FileUtils.forceMkdir(_segmentTarDir);
+
+    _handler = new FileHandler(_dataFileDir.getPath(), "data", ".json", MAX_FILE_SIZE);
+    _handler.open(true);
+
+    _fileSystem = FileSystem.get(job.getConfiguration());
+    _outputDir = FileOutputFormat.getOutputPath(job);
   }
 
   @Override
-  public void write(K key, V value)
-      throws IOException, InterruptedException {
-    PinotRecord record = _pinotRecordSerialization.serialize(value);
-    _handler.write(record.toBytes());
+  public void write(NullWritable key, T value)
+      throws IOException {
+    _handler.write(JsonUtils.objectToBytes(_fieldExtractor.extractFields(value)));
   }
 
   @Override
   public void close(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    _pinotRecordSerialization.close();
+      throws IOException {
     _handler.close();
-    File dir = new File(_baseDataDir);
-    _segmentConfig.setSegmentName(PinotOutputFormat.getSegmentName(context));
-    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    for (File f : dir.listFiles()) {
-      createSegment(f.getPath(), driver);
+
+    File[] dataFiles = _dataFileDir.listFiles();
+    assert dataFiles != null;
+    int numDataFiles = dataFiles.length;
+    for (int i = 0; i < numDataFiles; i++) {
+      createSegment(dataFiles[i], i);
     }
 
-    final FileSystem fs = FileSystem.get(new Configuration());
-    String localSegmentPath = new File(_segmentConfig.getOutDir(), _segmentConfig.getSegmentName()).getAbsolutePath();
-    String localTarPath = getLocalTarFile(PinotOutputFormat.getTempSegmentDir(context));
-    LOGGER.info("Trying to tar the segment to: {}", localTarPath);
-    TarGzCompressionUtils.createTarGzOfDirectory(localSegmentPath, localTarPath);
-    String hdfsTarPath =
-        _workDir + "/segmentTar/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
-
-    LOGGER.info("*********************************************************************");
-    LOGGER.info("Copy from : {} to {}", localTarPath, hdfsTarPath);
-    LOGGER.info("*********************************************************************");
-    fs.copyFromLocalFile(true, true, new Path(localTarPath), new Path(hdfsTarPath));
-    clean(PinotOutputFormat.getTempSegmentDir(context));
+    FileUtils.deleteDirectory(_tempSegmentDir);
   }
 
-  private void createSegment(String inputFile, SegmentIndexCreationDriver driver) {
+  private void createSegment(File dataFile, int sequenceId)
+      throws IOException {
+    LOGGER.info("Creating segment from data file: {} of sequence id: {}", dataFile, sequenceId);
+    _segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+    _segmentGeneratorConfig.setSequenceId(sequenceId);
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
     try {
-      _segmentConfig.setInputFilePath(inputFile);
-      driver.init(_segmentConfig);
+      driver.init(_segmentGeneratorConfig);
       driver.build();
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new IllegalStateException("Caught exception while creating segment from data file: " + dataFile);
     }
-  }
+    String segmentName = driver.getSegmentName();
+    File indexDir = driver.getOutputDirectory();
+    LOGGER.info("Created segment: {} from data file: {} into directory: {}", segmentName, dataFile, indexDir);
 
-  private String getLocalTarFile(String baseDir) {
-    String localTarDir = baseDir + "/segmentTar";
-    File f = new File(localTarDir);
-    if (!f.exists()) {
-      f.mkdirs();
-    }
-    return localTarDir + "/" + _segmentConfig.getSegmentName() + JobConfigConstants.TAR_GZ_FILE_EXT;
-  }
+    File segmentTarFile = new File(_segmentTarDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    LOGGER.info("Tarring segment: {} from directory: {} to: {}", segmentName, indexDir, segmentTarFile);
+    TarGzCompressionUtils.createTarGzOfDirectory(indexDir.getPath(), segmentTarFile.getPath());
 
-  /**
-   * delete the temp files
-   */
-  private void clean(String baseDir) {
-    File f = new File(baseDir);
-    if (f.exists()) {
-      f.delete();
-    }
+    Path hdfsSegmentTarPath = new Path(_outputDir, segmentTarFile.getName());
+    LOGGER.info("Copying segment tar file from local: {} to HDFS: {}", segmentTarFile, hdfsSegmentTarPath);
+    _fileSystem.copyFromLocalFile(true, new Path(segmentTarFile.getPath()), hdfsSegmentTarPath);
+
+    LOGGER
+        .info("Finish creating segment: {} from data file: {} of sequence id: {} into HDFS: {}", segmentName, dataFile,
+            sequenceId, hdfsSegmentTarPath);
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
index e0f5dbe..8b3f14b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/PinotOutputFormatTest.java
@@ -20,150 +20,111 @@ package org.apache.pinot.hadoop.io;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 
 
 public class PinotOutputFormatTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotOutputFormatTest");
+  private static final String RAW_TABLE_NAME = "testTable";
 
-  private TaskAttemptContext fakeTaskAttemptContext;
-  private Job job;
-  private Configuration conf;
-  private PinotOutputFormat outputFormat;
-  private File outputTempDir;
-  private String segmentTarPath;
-
-  private void init(String indexType)
-      throws IOException {
-    conf = new Configuration();
-    job = Job.getInstance(conf);
-    fakeTaskAttemptContext = mock(TaskAttemptContext.class);
-    outputFormat = new JsonPinotOutputFormat();
-    outputTempDir =
-        Files.createTempDirectory(PinotOutputFormatTest.class.getName() + indexType + "_io_output").toFile();
-    File workingTempDir =
-        Files.createTempDirectory(PinotOutputFormatTest.class.getName() + indexType + "_io_working_dir").toFile();
-    // output path
-    Path outDir = new Path(outputTempDir.getAbsolutePath());
-    PinotOutputFormat.setOutputPath(job, outDir);
-    PinotOutputFormat.setTableName(job, "emp");
-    PinotOutputFormat.setSegmentName(job, indexType + "segment_one");
-    PinotOutputFormat.setTimeColumnName(job, "epochDays");
-    PinotOutputFormat.setTempSegmentDir(job, workingTempDir.getAbsolutePath());
-
-    Schema schema = Schema.fromString(getSchema());
-    PinotOutputFormat.setSchema(job, schema);
-    mockTaskAttemptContext(indexType);
-    segmentTarPath =
-        "_temporary/0/_temporary/attempt_foo_task_" + indexType + "_0123_r_000002_2/part-r-00002/segmentTar";
+  @BeforeClass
+  public void setUp() {
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 
-  private void mockTaskAttemptContext(String indexType) {
-    TaskAttemptID fakeTaskId = new TaskAttemptID(new TaskID("foo_task_" + indexType, 123, TaskType.REDUCE, 2), 2);
-    when(fakeTaskAttemptContext.getTaskAttemptID()).thenReturn(fakeTaskId);
-    when(fakeTaskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.forceDelete(TEMP_DIR);
   }
 
   @Test
-  public void verifyRawIndex()
-      throws Exception {
-    runPinotOutputFormatTest("raw");
-  }
-
-  private void runPinotOutputFormatTest(String indexType)
-      throws Exception {
-    init(indexType);
-    Map<Integer, Emp> inputMap = addTestData();
-    validate(inputMap);
-  }
-
-  private void validate(Map<Integer, Emp> inputMap)
+  public void testPinotOutputFormat()
       throws Exception {
-    File segmentTarOutput = new File(outputTempDir, segmentTarPath);
-    File untarOutput = Files.createTempDirectory(PinotOutputFormatTest.class.getName() + "_segmentUnTar").toFile();
-    for (File tarFile : segmentTarOutput.listFiles()) {
-      TarGzCompressionUtils.unTar(tarFile, untarOutput);
+    Job job = Job.getInstance();
+    File outputDir = new File(TEMP_DIR, "output");
+    File tempSegmentDir = new File(TEMP_DIR, "tempSegment");
+    PinotOutputFormat.setOutputPath(job, new Path(outputDir.getAbsolutePath()));
+    PinotOutputFormat.setTempSegmentDir(job, tempSegmentDir.getAbsolutePath());
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    PinotOutputFormat.setTableConfig(job, tableConfig);
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("id", FieldSpec.DataType.INT)
+            .addSingleValueDimension("name", FieldSpec.DataType.STRING).addMetric("salary", FieldSpec.DataType.INT)
+            .build();
+    PinotOutputFormat.setSchema(job, schema);
+    PinotOutputFormat.setFieldExtractorClass(job, JsonBasedFieldExtractor.class);
+
+    TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+    when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+    PinotRecordWriter<Employee> pinotRecordWriter = PinotOutputFormat.getPinotRecordWriter(taskAttemptContext);
+    int numRecords = 10;
+    List<Employee> records = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      Employee employee = new Employee(i, "name" + i, 1000 * i);
+      pinotRecordWriter.write(null, employee);
+      records.add(employee);
     }
-
-    File outputDir = new File(untarOutput, PinotOutputFormat.getSegmentName(fakeTaskAttemptContext));
-    RecordReader recordReader = new PinotSegmentRecordReader(outputDir, Schema.fromString(getSchema()), null);
-    Map<Integer, GenericRow> resultMap = new HashMap<>();
-    while (recordReader.hasNext()) {
+    pinotRecordWriter.close(taskAttemptContext);
+
+    String segmentName = RAW_TABLE_NAME + "_0";
+    File segmentDir = new File(TEMP_DIR, "segment");
+    TarGzCompressionUtils
+        .unTar(new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), segmentDir);
+    File indexDir = new File(segmentDir, segmentName);
+    RecordReader recordReader = new PinotSegmentRecordReader(indexDir, null, null);
+    for (Employee record : records) {
       GenericRow row = recordReader.next();
-      resultMap.put((Integer) row.getValue("id"), row);
-    }
-
-    Assert.assertEquals(resultMap.size(), inputMap.size());
-    Assert.assertEquals(resultMap.get(8).getValue("name"), inputMap.get(8).name);
-  }
-
-  private Map<Integer, Emp> addTestData()
-      throws IOException, InterruptedException {
-    int days = 2000;
-    int sal = 20;
-    RecordWriter<Object, Emp> writer = outputFormat.getRecordWriter(fakeTaskAttemptContext);
-    Map<Integer, Emp> inputMap = new HashMap<>();
-    for (int i = 0; i < 10; i++) {
-      String name = "name " + i;
-      Emp e = new Emp(i, name, days + i, sal + i);
-      writer.write(null, e);
-      inputMap.put(i, e);
+      assertEquals(row.getValue("id"), record.getId());
+      assertEquals(row.getValue("name"), record.getName());
+      assertEquals(row.getValue("salary"), record.getSalary());
     }
-    writer.close(fakeTaskAttemptContext);
-    return inputMap;
+    assertFalse(recordReader.hasNext());
   }
 
-  private static class Emp implements Serializable {
+  public static class Employee {
+    private final int _id;
+    private final String _name;
+    private final int _salary;
 
-    public int id;
-    public String name;
-    public int epochDays;
-    public int salary;
+    private Employee(int id, String name, int salary) {
+      _id = id;
+      _name = name;
+      _salary = salary;
+    }
 
-    public Emp(int id, String name, int epochDays, int salary) {
-      this.id = id;
-      this.name = name;
-      this.epochDays = epochDays;
-      this.salary = salary;
+    public int getId() {
+      return _id;
     }
 
-    @Override
-    public String toString() {
-      return "{\"Emp\":{" + "                        \"id\":\"" + id + "\"" + ",                         \"name\":\""
-          + name + "\"" + ",                         \"epochDays\":\"" + epochDays + "\""
-          + ",                         \"salary\":\"" + salary + "\"" + "}}";
+    public String getName() {
+      return _name;
     }
-  }
 
-  private String getSchema() {
-    return "{\n" + "  \"dimensionFieldSpecs\" : [\n" + "    {\n" + "      \"name\": \"id\",\n"
-        + "      \"dataType\" : \"INT\",\n" + "      \"delimiter\" : null,\n" + "      \"singleValueField\" : true\n"
-        + "    },\n" + "    {\n" + "      \"name\": \"name\",\n" + "      \"dataType\" : \"STRING\",\n"
-        + "      \"delimiter\" : null,\n" + "      \"singleValueField\" : true\n" + "    }\n" + "  ],\n"
-        + "  \"timeFieldSpec\" : {\n" + "    \"incomingGranularitySpec\" : {\n" + "      \"timeType\" : \"DAYS\",\n"
-        + "      \"dataType\" : \"INT\",\n" + "      \"name\" : \"epochDays\"\n" + "    }\n" + "  },\n"
-        + "  \"metricFieldSpecs\" : [\n" + "    {\n" + "      \"name\" : \"salary\",\n"
-        + "      \"dataType\" : \"INT\",\n" + "      \"delimiter\" : null,\n" + "      \"singleValueField\" : true\n"
-        + "    }\n" + "   ],\n" + "  \"schemaName\" : \"emp\"\n" + "}";
+    public int getSalary() {
+      return _salary;
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org