You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhangminglei <gi...@git.apache.org> on 2018/05/25 05:04:36 UTC

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

GitHub user zhangminglei opened a pull request:

    https://github.com/apache/flink/pull/6075

    [FLINK-9407] [hdfs connector] Support orc rolling sink writer

    ## What is the purpose of the change
    In production environment, we often write file in orc format. So, this PR support a writer that for bucketingsink. And currently only basic data types are supported in this PR.
    
    ## Brief change log
    Add ```OrcFileWriter``` and the test file.
    
    ## Verifying this change
    Tests it with ```OrcFileWriterTest```
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not documented now)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhangminglei/flink flink-9407-orc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6075
    
----
commit d653d39f6cb74075874eb890cab36012d85dbecf
Author: zhangminglei <zm...@...>
Date:   2018-05-25T04:58:01Z

    [FLINK-9407] [hdfs connector] Support orc rolling sink writer

----


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200830078
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    --- End diff --
    
    How do you handle NULL values?


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200888235
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    +					break;
    +				default:
    --- End diff --
    
    I will add the other types in the next couple of days.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r204232617
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    --- End diff --
    
    With current implementation, I think ```T record``` would cause NULL value.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200888987
  
    --- Diff: flink-connectors/flink-orc/pom.xml ---
    @@ -54,6 +54,14 @@ under the License.
     			<optional>true</optional>
     		</dependency>
     
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
    +			<version>${project.version}</version>
    +			<!-- Projects depending on this project, won't depend on flink-filesystem. -->
    +			<optional>true</optional>
    +		</dependency>
    +
     		<dependency>
     			<groupId>org.apache.orc</groupId>
     			<artifactId>orc-core</artifactId>
    --- End diff --
    
    Yes. We can upgrade it. Will update.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200892031
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    +					break;
    +				default:
    +					throw new IllegalArgumentException("Unsupported column type " + typeInfo);
    --- End diff --
    
    Yea. We should check it in constructor as well in order to allow users to find problems early. Will update later.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200830153
  
    --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java ---
    @@ -0,0 +1,40 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for {@link OrcFileWriter}.
    + */
    +public class OrcFileWriterTest {
    +
    +	@Test
    +	public void testDuplicate() {
    +		OrcFileWriter<Row> writer = new OrcFileWriter<Row>("struct<x:int,y:int>");
    --- End diff --
    
    Need UTs for writing Orc files with all supported types. Also include negative cases, and cases where OrcBatchWriter.fill() returns false.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r204233444
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.fs;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	private TypeDescription schema;
    +
    +	private String meatSchema;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private VectorizedRowBatch rowBatch;
    +
    +	private CompressionKind compressionKind;
    +
    +	private long writedRowSize;
    +
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of a orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.meatSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(meatSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, meatSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(meatSchema, writer.meatSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    --- End diff --
    
    Thanks @sagarl . I will fix the ```NULL``` situation.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200830013
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    --- End diff --
    
    So when rowBatch is full, you basically throw away the record that's supposed to add?


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Hi @zhangminglei 
    Sorry for the late response - I thought about this solution quite a bit and came to the conclusion that we may need to do a bit more for efficient results:
    
    Please take a look at [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and the subtask [FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753)
    The description outlines why I believe the simple approach suggested here may not be enough (will frequently result in badly compressed ORC/Parquet).
    
    We have already started this effort to completely redesign the BucketingSink. The initial work-in-progress looks quite promising.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by yuruiz <gi...@git.apache.org>.
Github user yuruiz commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r201630927
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    --- End diff --
    
    This call does not guarantee that the data would be persisted. The data might still stay in memory and getting lost in case of fail over.
    
    Another problem is that even the data is getting persisted, this orc file might be unreadable since there is no proper File footer there. The correct flushing behaviour should make sure that the data has been persisted and a proper file footer has been created.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    @zhangminglei  For your interest - there is a new Bucketing Sink in the Flink master (called `StreamingFileSink`), with a different design: Managing all state in Flink state (so it is consistent), with a new File System writer abstraction to generalize across HDFS, POSIX, and S3 (S3 still WIP) and with a more pluggable way to add encoders, like parquet and orc.
    
    As an example, we added a Parquet writer, which is quite straightforward and flexible with the new interface.
    
    Would be great to get your opinion on that and see if your ORC writer code also works with that.
    If it works out, the new StreamingFileSink could replace the current BucketingSink.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Thanks @hequn8128. You are very correct! It is more reasonable to put the ORC related classes into there ~, like @StephanEwen said, at the very least. make them optional. As refers to implement the ```CheckpointedFuntion```, I would not think it is necessary for it since ```BucketingSink``` have already implemented that and we just use the orc writer in this way like ```bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString))```.Also, we can take a look on  ```SequenceFileWriter``` and ```StringWriter``` both do not implement ```CheckpointedFuntion```.
    



---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by wgtmac <gi...@git.apache.org>.
Github user wgtmac commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200879624
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    --- End diff --
    
    Only consider the first level schema?


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by sagarl <gi...@git.apache.org>.
Github user sagarl commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200217021
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.fs;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	private TypeDescription schema;
    +
    +	private String meatSchema;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private VectorizedRowBatch rowBatch;
    +
    +	private CompressionKind compressionKind;
    +
    +	private long writedRowSize;
    +
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of a orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.meatSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(meatSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, meatSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(meatSchema, writer.meatSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    --- End diff --
    
    This conversion fails with 
    
    > java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
    
    It works with 
    
    > stringColumnVector.setVal(nextPosition, ( record.getField(index).toString()).getBytes(StandardCharsets.UTF_8));



---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by sagarl <gi...@git.apache.org>.
Github user sagarl commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200217627
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.fs;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	private TypeDescription schema;
    +
    +	private String meatSchema;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private VectorizedRowBatch rowBatch;
    +
    +	private CompressionKind compressionKind;
    +
    +	private long writedRowSize;
    +
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of a orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.meatSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(meatSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, meatSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(meatSchema, writer.meatSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    --- End diff --
    
    All the conversions fail with NullPointer exception if `T record` has null values.
    eg. 
    
    > BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
       stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    
    Following hack works:
    
    > BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
                        Object strField = record.getField(index);
                        String modifiedStrField = strField == null ? "" : strField.toString();
                        stringColumnVector.setVal(nextPosition, (modifiedStrField).getBytes(StandardCharsets.UTF_8));



---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    If it is not a problem that this can lead to poor compression when checkpoint intervals are short, we could think about merging this as a temporary solution until [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) is fully developed.
    
    I would suggest to make the ORC and table dependencies optional, though, that not every user of the BucketingSink needs to have these dependencies.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    @StephanEwen Can you take a look on this ? Thank you ~


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200890648
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    --- End diff --
    
    What do you mean "first level schema" ? This PR does not support complex data structure at the moment.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Hi, Thanks @StephanEwen . For a short checkpoint time that whether lead to poor compression or not, I will give an actual production test under compression situation and as an attachment attached to [FLINK-9407](https://issues.apache.org/jira/browse/FLINK-9407) in the next few days.
    
    Actually, We having been using this PR (but not polished yet) in our production environment for a long time. And getting a very nice results ending that compared to spark streaming. I will put the test results in the form of attachments to [FLINK-9407](https://issues.apache.org/jira/browse/FLINK-9407) as a reference. And what you are more concerned about the short checkpoint interval will lead to poor compression, yea I would agree but we need a test for it. But we have all known that the longer snapshots interval, the lower performance impact of asynchronous or synchronous snapshotting we would get. So, I think people do not inclined to adopt short checkpoint intervals for getting a high throughput and low latency in production env. For example, in our calculation of uv jobs, the checkpoint intervals is 30 seconds. Anyway, I will still give a test results under the compression situation.
    
    I suggest this PR can merge as a temporary solution to reduce the user's learning curve since some users already had used ```BucketingSink``` in their project and wants this functionality as we can see in the user mail list a few days ago.  And In a short time, they may not switch to a new sink ```StreamingFileSink ``` . This may be one of the good reasons.
    
    By the way, I will watch  [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and the subtask   [FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753) soon and give more response in the next couple of days.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r204231919
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.fs;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	private TypeDescription schema;
    +
    +	private String meatSchema;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private VectorizedRowBatch rowBatch;
    +
    +	private CompressionKind compressionKind;
    +
    +	private long writedRowSize;
    +
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of a orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.meatSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(meatSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, meatSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(meatSchema, writer.meatSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    --- End diff --
    
    Hi @sagarl , Sorry for my later response. Since these days I have been busy with other stuff. Thanks for pointing this error. I will fix this bug!


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by wgtmac <gi...@git.apache.org>.
Github user wgtmac commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200879243
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    --- End diff --
    
    writedRowSize += rowBatch.size;


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200830833
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    --- End diff --
    
    Thanks @xndai for review! Here, ```isFill``` return false if batch is full. So, when the rowBatch is full, we add a row batch to the ORC file.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200889912
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    --- End diff --
    
    I will verify the NULL situation and ping you again. @xndai Thank you ~


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    The dependencies of the `flink-connector-filesystem` are not well set up already, having an Avro dependency and a Hadoop dependency. I agree that it would be good to not introduce yet more dependencies, or at the very least, make them optional dependencies.
    
    FYI: In the re-work of the BucketingSink under [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749), we want to introduce a `BulkEncoder` interface in `flink-core` that is used by the BucketingSink, and can be implemented by classes in `flink-orc` (and later a new `flink-parquet` project). That way we cleanly separate dependencies of the projects.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200833563
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    --- End diff --
    
    Thanks! I will fix this bug! Very nice question!


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200889601
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    --- End diff --
    
    Thank you very much! I will fix this error!


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by wgtmac <gi...@git.apache.org>.
Github user wgtmac commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200878724
  
    --- Diff: flink-connectors/flink-orc/pom.xml ---
    @@ -54,6 +54,14 @@ under the License.
     			<optional>true</optional>
     		</dependency>
     
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
    +			<version>${project.version}</version>
    +			<!-- Projects depending on this project, won't depend on flink-filesystem. -->
    +			<optional>true</optional>
    +		</dependency>
    +
     		<dependency>
     			<groupId>org.apache.orc</groupId>
     			<artifactId>orc-core</artifactId>
    --- End diff --
    
    Current orc version is 1.5.X. Should we upgrade it as well?


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by wgtmac <gi...@git.apache.org>.
Github user wgtmac commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200879775
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    +					break;
    +				default:
    --- End diff --
    
    Just curious, other types are not supported in flink?


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200831458
  
    --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcFileWriterTest.java ---
    @@ -0,0 +1,40 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests for {@link OrcFileWriter}.
    + */
    +public class OrcFileWriterTest {
    +
    +	@Test
    +	public void testDuplicate() {
    +		OrcFileWriter<Row> writer = new OrcFileWriter<Row>("struct<x:int,y:int>");
    --- End diff --
    
    Yes. I will add more UTs for all in the next couple of days.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Patch have been updated.


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Hi @zhangminglei , The main reason to implement the `CheckpointedFuntion ` is you have kept a buffer in you Writer class. The buffer is a memory buffer which will suffer data loss during job failover.


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200830111
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    +			writer.addRowBatch(rowBatch);
    +			writedRowSize += writedRowSize + rowBatch.size;
    +			rowBatch.reset();
    +		}
    +	}
    +
    +	@Override
    +	public long flush() throws IOException {
    +		writer.addRowBatch(rowBatch);
    +		writedRowSize += rowBatch.size;
    +		rowBatch.reset();
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public long getPos() throws IOException {
    +		return writedRowSize;
    +	}
    +
    +	@Override
    +	public Writer<T> duplicate() {
    +		return new OrcFileWriter<>(metaSchema, compressionKind);
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		flush();
    +		if (rowBatch.size != 0) {
    +			writer.addRowBatch(rowBatch);
    +			rowBatch.reset();
    +		}
    +		writer.close();
    +		super.close();
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), schema, metaSchema, rowBatch, orcBatchWriter);
    +	}
    +
    +	@Override
    +	public boolean equals(Object other) {
    +		if (this == other) {
    +			return true;
    +		}
    +		if (other == null) {
    +			return false;
    +		}
    +		if (getClass() != other.getClass()) {
    +			return false;
    +		}
    +		OrcFileWriter<T> writer = (OrcFileWriter<T>) other;
    +		return Objects.equals(schema, writer.schema)
    +			&& Objects.equals(metaSchema, writer.metaSchema)
    +			&& Objects.equals(rowBatch, writer.rowBatch)
    +			&& Objects.equals(orcBatchWriter, writer.orcBatchWriter);
    +	}
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private class OrcBatchWriter {
    +
    +		private List<TypeInformation> typeInfos;
    +
    +		public OrcBatchWriter(List<TypeInformation> typeInfos) {
    +			this.typeInfos = typeInfos;
    +		}
    +
    +		/**
    +		 * Fill the record into {@code VectorizedRowBatch}, return false if batch is full.
    +		 *
    +		 * @param batch  An reusable orc VectorizedRowBatch.
    +		 * @param record Input record.
    +		 * @return Return false if batch is full.
    +		 */
    +		private boolean fill(VectorizedRowBatch batch, T record) {
    +			// If the batch is full, write it out and start over.
    +			if (batch.size == batch.getMaxSize()) {
    +				return false;
    +			} else {
    +				IntStream.range(0, typeInfos.size()).forEach(
    +					index -> {
    +						setColumnVectorValueByType(typeInfos.get(index), batch, index, batch.size, record);
    +					}
    +				);
    +				batch.size += 1;
    +				return true;
    +			}
    +		}
    +
    +		private void setColumnVectorValueByType(TypeInformation typeInfo,
    +												VectorizedRowBatch batch,
    +												int index,
    +												int nextPosition,
    +												T record) {
    +			switch (typeInfo.toString()) {
    +				case "Long":
    +					LongColumnVector longColumnVector = (LongColumnVector) batch.cols[index];
    +					longColumnVector.vector[nextPosition] = (Long) record.getField(index);
    +					break;
    +				case "Boolean":
    +					LongColumnVector booleanColumnVector = (LongColumnVector) batch.cols[index];
    +					Boolean bool = (Boolean) record.getField(index);
    +					int boolValue;
    +					if (bool) {
    +						boolValue = 1;
    +					} else {
    +						boolValue = 0;
    +					}
    +					booleanColumnVector.vector[nextPosition] = boolValue;
    +					break;
    +				case "Short":
    +					LongColumnVector shortColumnVector = (LongColumnVector) batch.cols[index];
    +					shortColumnVector.vector[nextPosition] = (Short) record.getField(index);
    +					break;
    +				case "Integer":
    +					LongColumnVector intColumnVector = (LongColumnVector) batch.cols[index];
    +					intColumnVector.vector[nextPosition] = (Integer) record.getField(index);
    +					break;
    +				case "Float":
    +					DoubleColumnVector floatColumnVector = (DoubleColumnVector) batch.cols[index];
    +					floatColumnVector.vector[nextPosition] = (Float) record.getField(index);
    +					break;
    +				case "Double":
    +					DoubleColumnVector doubleColumnVector = (DoubleColumnVector) batch.cols[index];
    +					doubleColumnVector.vector[nextPosition] = (Double) record.getField(index);
    +					break;
    +				case "String":
    +					BytesColumnVector stringColumnVector = (BytesColumnVector) batch.cols[index];
    +					stringColumnVector.setVal(nextPosition, ((String) record.getField(index)).getBytes(StandardCharsets.UTF_8));
    +					break;
    +				default:
    +					throw new IllegalArgumentException("Unsupported column type " + typeInfo);
    --- End diff --
    
    You might want to do this type check in constructor as well. 


---

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6075#discussion_r200831360
  
    --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.orc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
    +import org.apache.flink.streaming.connectors.fs.Writer;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
    +
    +/**
    + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
    + *
    + * @param <T> The type of the elements that are being written by the sink.
    + */
    +public class OrcFileWriter<T extends Row> extends StreamWriterBase<T> {
    +
    +	private static final long serialVersionUID = 3L;
    +
    +	/**
    +	 * The description of the types in an ORC file.
    +	 */
    +	private TypeDescription schema;
    +
    +	/**
    +	 * The schema of an ORC file.
    +	 */
    +	private String metaSchema;
    +
    +	/**
    +	 * A row batch that will be written to the ORC file.
    +	 */
    +	private VectorizedRowBatch rowBatch;
    +
    +	/**
    +	 * The writer that fill the records into the batch.
    +	 */
    +	private OrcBatchWriter orcBatchWriter;
    +
    +	private transient org.apache.orc.Writer writer;
    +
    +	private CompressionKind compressionKind;
    +
    +	/**
    +	 * The number of rows that currently being written.
    +	 */
    +	private long writedRowSize;
    +
    +	/**
    +	 * Creates a new {@code OrcFileWriter} that writes orc files without compression.
    +	 *
    +	 * @param metaSchema The orc schema.
    +	 */
    +	public OrcFileWriter(String metaSchema) {
    +		this(metaSchema, CompressionKind.NONE);
    +	}
    +
    +	/**
    +	 * Create a new {@code OrcFileWriter} that writes orc file with the gaven
    +	 * schema and compression kind.
    +	 *
    +	 * @param metaSchema      The schema of an orc file.
    +	 * @param compressionKind The compression kind to use.
    +	 */
    +	public OrcFileWriter(String metaSchema, CompressionKind compressionKind) {
    +		this.metaSchema = metaSchema;
    +		this.schema = TypeDescription.fromString(metaSchema);
    +		this.compressionKind = compressionKind;
    +	}
    +
    +	@Override
    +	public void open(FileSystem fs, Path path) throws IOException {
    +		writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
    +		rowBatch = schema.createRowBatch();
    +		orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
    +	}
    +
    +	private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
    +		List<String> fieldNames = orcSchema.getFieldNames();
    +		List<TypeDescription> typeDescriptions = orcSchema.getChildren();
    +		List<TypeInformation> typeInformations = new ArrayList<>();
    +
    +		typeDescriptions.forEach(typeDescription -> {
    +			typeInformations.add(schemaToTypeInfo(typeDescription));
    +		});
    +
    +		return new TableSchema(
    +			fieldNames.toArray(new String[fieldNames.size()]),
    +			typeInformations.toArray(new TypeInformation[typeInformations.size()]));
    +	}
    +
    +	@Override
    +	public void write(T element) throws IOException {
    +		Boolean isFill = orcBatchWriter.fill(rowBatch, element);
    +		if (!isFill) {
    --- End diff --
    
    With current implementation, when user call write() and rowBatch is full, you flush the batch to ORC file. But where does the input parameter "element" go? 


---

[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/6075
  
    Hi @zhangminglei , thanks very much for your PR. 
    
    As for dependencies, I think it is better to move ORC related classes into the module of `flink-orc`, so we don't need to add orc dependency in module of `flink-connector-filesystem`.
    
    Moreover, the `OrcFileWriter` should implement interface `CheckpointedFunction` to prevent loss of data in `rowBatch` during job failover. During checkpointing, you can store the data in `rowBatch` into state, or call `flush()`. Similar logic can be found in `BucketingSink`.
    
    Best, Hequn.


---