You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2017/11/08 08:24:33 UTC

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149600151
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.streaming;
    +
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +import org.apache.carbondata.core.util.path.CarbonStorePath;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +import org.apache.carbondata.format.FileHeader;
    +import org.apache.carbondata.processing.loading.BadRecordsLogger;
    +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.loading.DataField;
    +import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
    +import org.apache.carbondata.processing.loading.converter.RowConverter;
    +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.loading.parser.RowParser;
    +import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
    +import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
    +import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskID;
    +
    +/**
    + * Stream record writer
    + */
    +public class CarbonStreamRecordWriter extends RecordWriter {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
    +
    +  // basic info
    +  private Configuration hadoopConf;
    +  private CarbonDataLoadConfiguration configuration;
    +  private String segmentId;
    +  private int taskNo;
    +  private CarbonTable carbonTable;
    +  private int maxRowNums;
    +  private int maxCacheSize;
    +
    +  // parser and converter
    +  private RowParser rowParser;
    +  private RowConverter converter;
    +  private CarbonRow currentRow = new CarbonRow(null);
    +
    +  // encoder
    +  private DataField[] dataFields;
    +  private BitSet nullBitSet;
    +  private boolean[] isNoDictionaryDimensionColumn;
    +  private int dimensionWithComplexCount;
    +  private int measureCount;
    +  private int[] measureDataTypes;
    +  private StreamBlockletWriter output = null;
    +
    +  // data write
    +  private String segmentDir;
    +  private String fileName;
    +  private DataOutputStream outputStream;
    +  private boolean isFirstRow = true;
    +  private boolean hasException = false;
    +
    +  CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
    +    initialize(job);
    +  }
    +
    +  /**
    +   *
    +   */
    +  private void initialize(TaskAttemptContext job) throws IOException {
    +    // set basic information
    +    hadoopConf = job.getConfiguration();
    +    CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
    +    if (carbonLoadModel == null) {
    +      throw new IOException(
    +          "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
    +    }
    +    segmentId = carbonLoadModel.getSegmentId();
    +    carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
    +    taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
    +    carbonLoadModel.setTaskNo("" + taskNo);
    +    configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
    +    maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
    +        CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
    +    maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
    +        CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
    +    // try recover data file from fault for task at first
    +    tryRecoverFromFault();
    +  }
    +
    +  /**
    +   * try recover data file from fault for task
    +   */
    +  private void tryRecoverFromFault() throws IOException {
    +    CarbonTablePath tablePath =
    +        CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
    +    segmentDir = tablePath.getSegmentDir("0", segmentId);
    +    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
    +    String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    +    CarbonStreamRecordWriter.recoverDataFile(segmentDir, fileName, indexName);
    +  }
    +
    +  public static void recoverDataFile(String segmentDir, String fileName, String indexName)
    --- End diff --
    
    Better to keep file recovery and segment recovery handling function in same class


---