You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2017/11/08 08:24:33 UTC
[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149600151
--- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Stream record writer
+ */
+public class CarbonStreamRecordWriter extends RecordWriter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
+
+ // basic info
+ private Configuration hadoopConf;
+ private CarbonDataLoadConfiguration configuration;
+ private String segmentId;
+ private int taskNo;
+ private CarbonTable carbonTable;
+ private int maxRowNums;
+ private int maxCacheSize;
+
+ // parser and converter
+ private RowParser rowParser;
+ private RowConverter converter;
+ private CarbonRow currentRow = new CarbonRow(null);
+
+ // encoder
+ private DataField[] dataFields;
+ private BitSet nullBitSet;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private int dimensionWithComplexCount;
+ private int measureCount;
+ private int[] measureDataTypes;
+ private StreamBlockletWriter output = null;
+
+ // data write
+ private String segmentDir;
+ private String fileName;
+ private DataOutputStream outputStream;
+ private boolean isFirstRow = true;
+ private boolean hasException = false;
+
+ CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
+ initialize(job);
+ }
+
+ /**
+ *
+ */
+ private void initialize(TaskAttemptContext job) throws IOException {
+ // set basic information
+ hadoopConf = job.getConfiguration();
+ CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+ if (carbonLoadModel == null) {
+ throw new IOException(
+ "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
+ }
+ segmentId = carbonLoadModel.getSegmentId();
+ carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+ taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+ carbonLoadModel.setTaskNo("" + taskNo);
+ configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
+ maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
+ CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
+ maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
+ CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
+ // try recover data file from fault for task at first
+ tryRecoverFromFault();
+ }
+
+ /**
+ * try recover data file from fault for task
+ */
+ private void tryRecoverFromFault() throws IOException {
+ CarbonTablePath tablePath =
+ CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+ segmentDir = tablePath.getSegmentDir("0", segmentId);
+ fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+ String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
+ CarbonStreamRecordWriter.recoverDataFile(segmentDir, fileName, indexName);
+ }
+
+ public static void recoverDataFile(String segmentDir, String fileName, String indexName)
--- End diff --
Better to keep file recovery and segment recovery handling function in same class
---