You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:40 UTC
[25/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
deleted file mode 100644
index 72472fc..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * FileScanner for reading Avro files
- */
-public class AvroScanner extends FileScanner {
- private Schema avroSchema;
- private List<Schema.Field> avroFields;
- private DataFileReader<GenericRecord> dataFileReader;
- private int[] projectionMap;
-
- /**
- * Creates a new AvroScanner.
- *
- * @param conf
- * @param schema
- * @param meta
- * @param fragment
- */
- public AvroScanner(Configuration conf,
- final org.apache.tajo.catalog.Schema schema,
- final TableMeta meta, final Fragment fragment) {
- super(conf, schema, meta, fragment);
- }
-
- /**
- * Initializes the AvroScanner.
- */
- @Override
- public void init() throws IOException {
- if (targets == null) {
- targets = schema.toArray();
- }
- prepareProjection(targets);
-
- avroSchema = AvroUtil.getAvroSchema(meta, conf);
- avroFields = avroSchema.getFields();
-
- DatumReader<GenericRecord> datumReader =
- new GenericDatumReader<GenericRecord>(avroSchema);
- SeekableInput input = new FsInput(fragment.getPath(), conf);
- dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
- super.init();
- }
-
- private void prepareProjection(Column[] targets) {
- projectionMap = new int[targets.length];
- for (int i = 0; i < targets.length; ++i) {
- projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
- }
-
- private static String fromAvroString(Object value) {
- if (value instanceof Utf8) {
- Utf8 utf8 = (Utf8)value;
- return utf8.toString();
- }
- return value.toString();
- }
-
- private static Schema getNonNull(Schema schema) {
- if (!schema.getType().equals(Schema.Type.UNION)) {
- return schema;
- }
- List<Schema> schemas = schema.getTypes();
- if (schemas.size() != 2) {
- return schema;
- }
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return schemas.get(1);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return schemas.get(0);
- } else {
- return schema;
- }
- }
-
- private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
- int intValue = (Integer)value;
- switch (tajoType) {
- case BIT:
- return DatumFactory.createBit((byte)(intValue & 0xff));
- case INT2:
- return DatumFactory.createInt2((short)intValue);
- default:
- return DatumFactory.createInt4(intValue);
- }
- }
-
- private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
- DataType dataType) {
- ByteBuffer buffer = (ByteBuffer)value;
- byte[] bytes = new byte[buffer.capacity()];
- buffer.get(bytes, 0, bytes.length);
- switch (tajoType) {
- case INET4:
- return DatumFactory.createInet4(bytes);
- case PROTOBUF:
- try {
- ProtobufDatumFactory factory =
- ProtobufDatumFactory.get(dataType.getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(bytes);
- return factory.createDatum(builder);
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- default:
- return new BlobDatum(bytes);
- }
- }
-
- private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
- switch (tajoType) {
- case CHAR:
- return DatumFactory.createChar(fromAvroString(value));
- default:
- return DatumFactory.createText(fromAvroString(value));
- }
- }
-
- /**
- * Reads the next Tuple from the Avro file.
- *
- * @return The next Tuple from the Avro file or null if end of file is
- * reached.
- */
- @Override
- public Tuple next() throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- Tuple tuple = new VTuple(schema.size());
- GenericRecord record = dataFileReader.next();
- for (int i = 0; i < projectionMap.length; ++i) {
- int columnIndex = projectionMap[i];
- Object value = record.get(columnIndex);
- if (value == null) {
- tuple.put(columnIndex, NullDatum.get());
- continue;
- }
-
- // Get Avro type.
- Schema.Field avroField = avroFields.get(columnIndex);
- Schema nonNullAvroSchema = getNonNull(avroField.schema());
- Schema.Type avroType = nonNullAvroSchema.getType();
-
- // Get Tajo type.
- Column column = schema.getColumn(columnIndex);
- DataType dataType = column.getDataType();
- TajoDataTypes.Type tajoType = dataType.getType();
- switch (avroType) {
- case NULL:
- tuple.put(columnIndex, NullDatum.get());
- break;
- case BOOLEAN:
- tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
- break;
- case INT:
- tuple.put(columnIndex, convertInt(value, tajoType));
- break;
- case LONG:
- tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
- break;
- case FLOAT:
- tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
- break;
- case DOUBLE:
- tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
- break;
- case BYTES:
- tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
- break;
- case STRING:
- tuple.put(columnIndex, convertString(value, tajoType));
- break;
- case RECORD:
- throw new RuntimeException("Avro RECORD not supported.");
- case ENUM:
- throw new RuntimeException("Avro ENUM not supported.");
- case MAP:
- throw new RuntimeException("Avro MAP not supported.");
- case UNION:
- throw new RuntimeException("Avro UNION not supported.");
- case FIXED:
- tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
- break;
- default:
- throw new RuntimeException("Unknown type.");
- }
- }
- return tuple;
- }
-
- /**
- * Resets the scanner
- */
- @Override
- public void reset() throws IOException {
- }
-
- /**
- * Closes the scanner.
- */
- @Override
- public void close() throws IOException {
- if (dataFileReader != null) {
- dataFileReader.close();
- }
- }
-
- /**
- * Returns whether this scanner is projectable.
- *
- * @return true
- */
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- /**
- * Returns whether this scanner is selectable.
- *
- * @return false
- */
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- /**
- * Returns whether this scanner is splittable.
- *
- * @return false
- */
- @Override
- public boolean isSplittable() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
deleted file mode 100644
index 0d14c3d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.StorageConstants;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-
-public class AvroUtil {
- public static Schema getAvroSchema(TableMeta meta, Configuration conf)
- throws IOException {
-
- boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
- boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
- if (!isSchemaLiteral && !isSchemaUrl) {
- throw new RuntimeException("No Avro schema for table.");
- }
- if (isSchemaLiteral) {
- String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
- return new Schema.Parser().parse(schema);
- }
-
- String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
- if (schemaURL.toLowerCase().startsWith("http")) {
- return getAvroSchemaFromHttp(schemaURL);
- } else {
- return getAvroSchemaFromFileSystem(schemaURL, conf);
- }
- }
-
- public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
- InputStream inputStream = new URL(schemaURL).openStream();
-
- try {
- return new Schema.Parser().parse(inputStream);
- } finally {
- IOUtils.closeStream(inputStream);
- }
- }
-
- public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException {
- Path schemaPath = new Path(schemaURL);
- FileSystem fs = schemaPath.getFileSystem(conf);
- FSDataInputStream inputStream = fs.open(schemaPath);
-
- try {
- return new Schema.Parser().parse(inputStream);
- } finally {
- IOUtils.closeStream(inputStream);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
deleted file mode 100644
index 40d1545..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Avro files. Avro schemas are
- * converted to Tajo schemas according to the following mapping of Avro
- * and Tajo types:
- * </p>
- *
- * <table>
- * <tr>
- * <th>Avro type</th>
- * <th>Tajo type</th>
- * </tr>
- * <tr>
- * <td>NULL</td>
- * <td>NULL_TYPE</td>
- * </tr>
- * <tr>
- * <td>BOOLEAN</td>
- * <td>BOOLEAN</td>
- * </tr>
- * <tr>
- * <td>INT</td>
- * <td>INT4</td>
- * </tr>
- * <tr>
- * <td>LONG</td>
- * <td>INT8</td>
- * </tr>
- * <tr>
- * <td>FLOAT</td>
- * <td>FLOAT4</td>
- * </tr>
- * <tr>
- * <td>DOUBLE</td>
- * <td>FLOAT8</td>
- * </tr>
- * <tr>
- * <td>BYTES</td>
- * <td>BLOB</td>
- * </tr>
- * <tr>
- * <td>STRING</td>
- * <td>TEXT</td>
- * </tr>
- * <tr>
- * <td>FIXED</td>
- * <td>BLOB</td>
- * </tr>
- * <tr>
- * <td>RECORD</td>
- * <td>Not currently supported</td>
- * </tr>
- * <tr>
- * <td>ENUM</td>
- * <td>Not currently supported.</td>
- * </tr>
- * <tr>
- * <td>MAP</td>
- * <td>Not currently supported.</td>
- * </tr>
- * <tr>
- * <td>UNION</td>
- * <td>Not currently supported.</td>
- * </tr>
- * </table>
- */
-
-package org.apache.tajo.storage.avro;
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
deleted file mode 100644
index baeda8c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.compress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DoNotPool;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A global compressor/decompressor pool used to save and reuse (possibly
- * native) compression/decompression codecs.
- */
-public final class CodecPool {
- private static final Log LOG = LogFactory.getLog(CodecPool.class);
-
- /**
- * A global compressor pool used to save the expensive
- * construction/destruction of (possibly native) decompression codecs.
- */
- private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
- new HashMap<Class<Compressor>, List<Compressor>>();
-
- /**
- * A global decompressor pool used to save the expensive
- * construction/destruction of (possibly native) decompression codecs.
- */
- private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
- new HashMap<Class<Decompressor>, List<Decompressor>>();
-
- private static <T> T borrow(Map<Class<T>, List<T>> pool,
- Class<? extends T> codecClass) {
- T codec = null;
-
- // Check if an appropriate codec is available
- synchronized (pool) {
- if (pool.containsKey(codecClass)) {
- List<T> codecList = pool.get(codecClass);
-
- if (codecList != null) {
- synchronized (codecList) {
- if (!codecList.isEmpty()) {
- codec = codecList.remove(codecList.size() - 1);
- }
- }
- }
- }
- }
-
- return codec;
- }
-
- private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
- if (codec != null) {
- Class<T> codecClass = (Class<T>) codec.getClass();
- synchronized (pool) {
- if (!pool.containsKey(codecClass)) {
- pool.put(codecClass, new ArrayList<T>());
- }
-
- List<T> codecList = pool.get(codecClass);
- synchronized (codecList) {
- codecList.add(codec);
- }
- }
- }
- }
-
- /**
- * Get a {@link Compressor} for the given {@link CompressionCodec} from the
- * pool or a new one.
- *
- * @param codec
- * the <code>CompressionCodec</code> for which to get the
- * <code>Compressor</code>
- * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
- * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
- * from the pool or a new one
- */
- public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
- Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
- if (compressor == null) {
- compressor = codec.createCompressor();
- LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
- } else {
- compressor.reinit(conf);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Got recycled compressor");
- }
- }
- return compressor;
- }
-
- public static Compressor getCompressor(CompressionCodec codec) {
- return getCompressor(codec, null);
- }
-
- /**
- * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
- * pool or a new one.
- *
- * @param codec
- * the <code>CompressionCodec</code> for which to get the
- * <code>Decompressor</code>
- * @return <code>Decompressor</code> for the given
- * <code>CompressionCodec</code> the pool or a new one
- */
- public static Decompressor getDecompressor(CompressionCodec codec) {
- Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
- .getDecompressorType());
- if (decompressor == null) {
- decompressor = codec.createDecompressor();
- LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Got recycled decompressor");
- }
- }
- return decompressor;
- }
-
- /**
- * Return the {@link Compressor} to the pool.
- *
- * @param compressor
- * the <code>Compressor</code> to be returned to the pool
- */
- public static void returnCompressor(Compressor compressor) {
- if (compressor == null) {
- return;
- }
- // if the compressor can't be reused, don't pool it.
- if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
- return;
- }
- compressor.reset();
- payback(COMPRESSOR_POOL, compressor);
- }
-
- /**
- * Return the {@link Decompressor} to the pool.
- *
- * @param decompressor
- * the <code>Decompressor</code> to be returned to the pool
- */
- public static void returnDecompressor(Decompressor decompressor) {
- if (decompressor == null) {
- return;
- }
- // if the decompressor can't be reused, don't pool it.
- if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
- return;
- }
- decompressor.reset();
- payback(DECOMPRESSOR_POOL, decompressor);
- }
-
- private CodecPool() {
- // prevent instantiation
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
deleted file mode 100644
index bb035a8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.storage.exception;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-public class AlreadyExistsStorageException extends IOException {
- private static final long serialVersionUID = 965518916144019032L;
-
-
- public AlreadyExistsStorageException(String path) {
- super("Error: "+path+" alreay exists");
- }
-
- public AlreadyExistsStorageException(Path path) {
- this(path.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
deleted file mode 100644
index a67d1f7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownCodecException extends Exception {
-
- private static final long serialVersionUID = 4287230843540404529L;
-
- public UnknownCodecException() {
-
- }
-
- public UnknownCodecException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
deleted file mode 100644
index d18b5a0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownDataTypeException extends Exception {
-
- private static final long serialVersionUID = -2630390595968966164L;
-
- public UnknownDataTypeException() {
-
- }
-
- public UnknownDataTypeException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
deleted file mode 100644
index 8b197d6..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.storage.exception;
-
-public class UnsupportedFileTypeException extends RuntimeException {
- private static final long serialVersionUID = -8160289695849000342L;
-
- public UnsupportedFileTypeException() {
- }
-
- /**
- * @param message
- */
- public UnsupportedFileTypeException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
deleted file mode 100644
index 4a83dbf..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
- @Expose private String tableName; // required
- @Expose private Path uri; // required
- @Expose public Long startOffset; // required
- @Expose public Long length; // required
-
- private String[] hosts; // Datanode hostnames
- @Expose private int[] diskIds;
-
- public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
- FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
-
- public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
- throws IOException {
- this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
- }
-
- public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
- this.set(tableName, uri, start, length, hosts, diskIds);
- }
- // Non splittable
- public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
- this.set(tableName, uri, start, length, hosts, null);
- }
-
- public FileFragment(String fragmentId, Path path, long start, long length) {
- this.set(fragmentId, path, start, length, null, null);
- }
-
- public FileFragment(FileFragmentProto proto) {
- init(proto);
- }
-
- private void init(FileFragmentProto proto) {
- int[] diskIds = new int[proto.getDiskIdsList().size()];
- int i = 0;
- for(Integer eachValue: proto.getDiskIdsList()) {
- diskIds[i++] = eachValue;
- }
- this.set(proto.getId(), new Path(proto.getPath()),
- proto.getStartOffset(), proto.getLength(),
- proto.getHostsList().toArray(new String[]{}),
- diskIds);
- }
-
- private void set(String tableName, Path path, long start,
- long length, String[] hosts, int[] diskIds) {
- this.tableName = tableName;
- this.uri = path;
- this.startOffset = start;
- this.length = length;
- this.hosts = hosts;
- this.diskIds = diskIds;
- }
-
-
- /**
- * Get the list of hosts (hostname) hosting this block
- */
- public String[] getHosts() {
- if (hosts == null) {
- this.hosts = new String[0];
- }
- return hosts;
- }
-
- /**
- * Get the list of Disk Ids
- * Unknown disk is -1. Others 0 ~ N
- */
- public int[] getDiskIds() {
- if (diskIds == null) {
- this.diskIds = new int[getHosts().length];
- Arrays.fill(this.diskIds, -1);
- }
- return diskIds;
- }
-
- public void setDiskIds(int[] diskIds){
- this.diskIds = diskIds;
- }
-
- @Override
- public String getTableName() {
- return this.tableName;
- }
-
- public Path getPath() {
- return this.uri;
- }
-
- public void setPath(Path path) {
- this.uri = path;
- }
-
- public Long getStartKey() {
- return this.startOffset;
- }
-
- @Override
- public String getKey() {
- return this.uri.toString();
- }
-
- @Override
- public long getLength() {
- return this.length;
- }
-
- @Override
- public boolean isEmpty() {
- return this.length <= 0;
- }
- /**
- *
- * The offset range of tablets <b>MUST NOT</b> be overlapped.
- *
- * @param t
- * @return If the table paths are not same, return -1.
- */
- @Override
- public int compareTo(FileFragment t) {
- if (getPath().equals(t.getPath())) {
- long diff = this.getStartKey() - t.getStartKey();
- if (diff < 0) {
- return -1;
- } else if (diff > 0) {
- return 1;
- } else {
- return 0;
- }
- } else {
- return -1;
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof FileFragment) {
- FileFragment t = (FileFragment) o;
- if (getPath().equals(t.getPath())
- && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
- && TUtil.checkEquals(t.getLength(), this.getLength())) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(tableName, uri, startOffset, length);
- }
-
- public Object clone() throws CloneNotSupportedException {
- FileFragment frag = (FileFragment) super.clone();
- frag.tableName = tableName;
- frag.uri = uri;
- frag.diskIds = diskIds;
- frag.hosts = hosts;
-
- return frag;
- }
-
- @Override
- public String toString() {
- return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
- +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
- + getLength() + "}" ;
- }
-
- public FragmentProto getProto() {
- FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
- builder.setId(this.tableName);
- builder.setStartOffset(this.startOffset);
- builder.setLength(this.length);
- builder.setPath(this.uri.toString());
- if(diskIds != null) {
- List<Integer> idList = new ArrayList<Integer>();
- for(int eachId: diskIds) {
- idList.add(eachId);
- }
- builder.addAllDiskIds(idList);
- }
-
- if(hosts != null) {
- builder.addAllHosts(TUtil.newList(hosts));
- }
-
- FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
- fragmentBuilder.setId(this.tableName);
- fragmentBuilder.setStoreType(StoreType.CSV.name());
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- return fragmentBuilder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
deleted file mode 100644
index ac43197..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import org.apache.tajo.common.ProtoObject;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public interface Fragment extends ProtoObject<FragmentProto> {
-
- public abstract String getTableName();
-
- @Override
- public abstract FragmentProto getProto();
-
- public abstract long getLength();
-
- public abstract String getKey();
-
- public String[] getHosts();
-
- public abstract boolean isEmpty();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
deleted file mode 100644
index 07720c7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.annotation.ThreadSafe;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-@ThreadSafe
-public class FragmentConvertor {
- /**
- * Cache of fragment classes
- */
- protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
- /**
- * Cache of constructors for each class.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
- /**
- * default parameter for all constructors
- */
- private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
-
- public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType)
- throws IOException {
- Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase());
- if (fragmentClass == null) {
- fragmentClass = conf.getClass(
- String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class);
- CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass);
- }
-
- if (fragmentClass == null) {
- throw new IOException("No such a fragment for " + storeType.toLowerCase());
- }
-
- return fragmentClass;
- }
-
- public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
- T result;
- try {
- Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
- if (constructor == null) {
- constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(clazz, constructor);
- }
- result = constructor.newInstance(new Object[]{fragment.getContents()});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment)
- throws IOException {
- Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase());
- if (fragmentClass == null) {
- throw new IOException("No such a fragment class for " + fragment.getStoreType());
- }
- return convert(fragmentClass, fragment);
- }
-
- public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
- throws IOException {
- List<T> list = Lists.newArrayList();
- if (fragments == null) {
- return list;
- }
- for (FragmentProto proto : fragments) {
- list.add(convert(clazz, proto));
- }
- return list;
- }
-
- public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException {
- List<T> list = Lists.newArrayList();
- if (fragments == null) {
- return list;
- }
- for (FragmentProto proto : fragments) {
- list.add((T) convert(conf, proto));
- }
- return list;
- }
-
- public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
- List<FragmentProto> list = Lists.newArrayList();
- if (fragments == null) {
- return list;
- }
- for (Fragment fragment : fragments) {
- list.add(fragment.getProto());
- }
- return list;
- }
-
- public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
- List<FragmentProto> list = toFragmentProtoList(fragments);
- return list.toArray(new FragmentProto[list.size()]);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
deleted file mode 100644
index 8615235..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.TUtil;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An abstract class for HBase appender.
- */
-public abstract class AbstractHBaseAppender implements Appender {
- protected Configuration conf;
- protected Schema schema;
- protected TableMeta meta;
- protected QueryUnitAttemptId taskAttemptId;
- protected Path stagingDir;
- protected boolean inited = false;
-
- protected ColumnMapping columnMapping;
- protected TableStatistics stats;
- protected boolean enabledStats;
-
- protected int columnNum;
-
- protected byte[][][] mappingColumnFamilies;
- protected boolean[] isBinaryColumns;
- protected boolean[] isRowKeyMappings;
- protected boolean[] isColumnKeys;
- protected boolean[] isColumnValues;
- protected int[] rowKeyFieldIndexes;
- protected int[] rowkeyColumnIndexes;
- protected char rowKeyDelimiter;
-
- // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
- protected int[] columnKeyValueDataIndexes;
- protected byte[][] columnKeyDatas;
- protected byte[][] columnValueDatas;
- protected byte[][] columnKeyCfNames;
-
- protected KeyValue[] keyValues;
-
- public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
- Schema schema, TableMeta meta, Path stagingDir) {
- this.conf = conf;
- this.schema = schema;
- this.meta = meta;
- this.stagingDir = stagingDir;
- this.taskAttemptId = taskAttemptId;
- }
-
- @Override
- public void init() throws IOException {
- if (inited) {
- throw new IllegalStateException("FileAppender is already initialized.");
- }
- inited = true;
- if (enabledStats) {
- stats = new TableStatistics(this.schema);
- }
- columnMapping = new ColumnMapping(schema, meta);
-
- mappingColumnFamilies = columnMapping.getMappingColumns();
-
- isRowKeyMappings = columnMapping.getIsRowKeyMappings();
- List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
- for (int i = 0; i < isRowKeyMappings.length; i++) {
- if (isRowKeyMappings[i]) {
- rowkeyColumnIndexList.add(i);
- }
- }
- rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
-
- isBinaryColumns = columnMapping.getIsBinaryColumns();
- isColumnKeys = columnMapping.getIsColumnKeys();
- isColumnValues = columnMapping.getIsColumnValues();
- rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
- rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
-
- this.columnNum = schema.size();
-
- // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
- // which are mapped to the same column family.
- columnKeyValueDataIndexes = new int[isColumnKeys.length];
- int index = 0;
- int numKeyValues = 0;
- Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
- for (int i = 0; i < isColumnKeys.length; i++) {
- if (isRowKeyMappings[i]) {
- continue;
- }
- if (isColumnKeys[i] || isColumnValues[i]) {
- String cfName = new String(mappingColumnFamilies[i][0]);
- if (!cfNameIndexMap.containsKey(cfName)) {
- cfNameIndexMap.put(cfName, index);
- columnKeyValueDataIndexes[i] = index;
- index++;
- numKeyValues++;
- } else {
- columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
- }
- } else {
- numKeyValues++;
- }
- }
- columnKeyCfNames = new byte[cfNameIndexMap.size()][];
- for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
- columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
- }
- columnKeyDatas = new byte[cfNameIndexMap.size()][];
- columnValueDatas = new byte[cfNameIndexMap.size()][];
-
- keyValues = new KeyValue[numKeyValues];
- }
-
- private ByteArrayOutputStream bout = new ByteArrayOutputStream();
-
- protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
- Datum datum;
- byte[] rowkey;
- if (rowkeyColumnIndexes.length > 1) {
- bout.reset();
- for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
- datum = tuple.get(rowkeyColumnIndexes[i]);
- if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
- rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
- } else {
- rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
- }
- bout.write(rowkey);
- if (i < rowkeyColumnIndexes.length - 1) {
- bout.write(rowKeyDelimiter);
- }
- }
- rowkey = bout.toByteArray();
- } else {
- int index = rowkeyColumnIndexes[0];
- datum = tuple.get(index);
- if (isBinaryColumns[index]) {
- rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
- } else {
- rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
- }
- }
-
- return rowkey;
- }
-
- protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
- int keyValIndex = 0;
- for (int i = 0; i < columnNum; i++) {
- if (isRowKeyMappings[i]) {
- continue;
- }
- Datum datum = tuple.get(i);
- byte[] value;
- if (isBinaryColumns[i]) {
- value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
- } else {
- value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
- }
-
- if (isColumnKeys[i]) {
- columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
- } else if (isColumnValues[i]) {
- columnValueDatas[columnKeyValueDataIndexes[i]] = value;
- } else {
- keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
- keyValIndex++;
- }
- }
-
- for (int i = 0; i < columnKeyDatas.length; i++) {
- keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
- }
- }
-
- @Override
- public void enableStats() {
- enabledStats = true;
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
deleted file mode 100644
index 8044494..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.logical.SortNode.SortPurpose;
-import org.apache.tajo.plan.rewrite.RewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-
-public class AddSortForInsertRewriter implements RewriteRule {
- private int[] sortColumnIndexes;
- private Column[] sortColumns;
- public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
- this.sortColumns = sortColumns;
- this.sortColumnIndexes = new int[sortColumns.length];
-
- Schema tableSchema = tableDesc.getSchema();
- for (int i = 0; i < sortColumns.length; i++) {
- sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
- }
- }
-
- @Override
- public String getName() {
- return "AddSortForInsertRewriter";
- }
-
- @Override
- public boolean isEligible(LogicalPlan plan) {
- StoreType storeType = PlannerUtil.getStoreType(plan);
- return storeType != null;
- }
-
- @Override
- public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- UnaryNode insertNode = rootNode.getChild();
- LogicalNode childNode = insertNode.getChild();
-
- Schema sortSchema = childNode.getOutSchema();
- SortNode sortNode = plan.createNode(SortNode.class);
- sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
- sortNode.setInSchema(sortSchema);
- sortNode.setOutSchema(sortSchema);
-
- SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
- int index = 0;
-
- for (int i = 0; i < sortColumnIndexes.length; i++) {
- Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
- if (sortColumn == null) {
- throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
- }
- sortSpecs[index++] = new SortSpec(sortColumn, true, true);
- }
- sortNode.setSortSpecs(sortSpecs);
-
- sortNode.setChild(insertNode.getChild());
- insertNode.setChild(sortNode);
- plan.getRootBlock().registerNode(sortNode);
-
- return plan;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
deleted file mode 100644
index f80bd5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class ColumnMapping {
- private TableMeta tableMeta;
- private Schema schema;
- private char rowKeyDelimiter;
-
- private String hbaseTableName;
-
- private int[] rowKeyFieldIndexes;
- private boolean[] isRowKeyMappings;
- private boolean[] isBinaryColumns;
- private boolean[] isColumnKeys;
- private boolean[] isColumnValues;
-
- // schema order -> 0: cf name, 1: column name -> name bytes
- private byte[][][] mappingColumns;
-
- private int numRowKeys;
-
- public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
- this.schema = schema;
- this.tableMeta = tableMeta;
-
- init();
- }
-
- private void init() throws IOException {
- hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
- String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
- if (delim.length() > 0) {
- rowKeyDelimiter = delim.charAt(0);
- }
- isRowKeyMappings = new boolean[schema.size()];
- rowKeyFieldIndexes = new int[schema.size()];
- isBinaryColumns = new boolean[schema.size()];
- isColumnKeys = new boolean[schema.size()];
- isColumnValues = new boolean[schema.size()];
-
- mappingColumns = new byte[schema.size()][][];
-
- for (int i = 0; i < schema.size(); i++) {
- rowKeyFieldIndexes[i] = -1;
- }
-
- String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
- if (columnMapping == null || columnMapping.isEmpty()) {
- throw new IOException("'columns' property is required.");
- }
-
- String[] columnMappingTokens = columnMapping.split(",");
-
- if (columnMappingTokens.length != schema.getColumns().size()) {
- throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns");
- }
-
- int index = 0;
- for (String eachToken: columnMappingTokens) {
- mappingColumns[index] = new byte[2][];
-
- byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':');
-
- if (mappingTokens.length == 3) {
- if (mappingTokens[0].length == 0) {
- // cfname
- throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
- "or '<cfname>:value:' or '<cfname>:value:#b'");
- }
- //<cfname>:key: or <cfname>:value:
- if (mappingTokens[2].length != 0) {
- String binaryOption = new String(mappingTokens[2]);
- if ("#b".equals(binaryOption)) {
- isBinaryColumns[index] = true;
- } else {
- throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
- "or '<cfname>:value:' or '<cfname>:value:#b'");
- }
- }
- mappingColumns[index][0] = mappingTokens[0];
- String keyOrValue = new String(mappingTokens[1]);
- if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
- isColumnKeys[index] = true;
- } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
- isColumnValues[index] = true;
- } else {
- throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
- }
- } else if (mappingTokens.length == 2) {
- //<cfname>: or <cfname>:<qualifier> or :key
- String cfName = new String(mappingTokens[0]);
- String columnName = new String(mappingTokens[1]);
- RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName);
- if (rowKeyMapping != null) {
- isRowKeyMappings[index] = true;
- numRowKeys++;
- isBinaryColumns[index] = rowKeyMapping.isBinary();
- if (!cfName.isEmpty()) {
- if (rowKeyDelimiter == 0) {
- throw new IOException("hbase.rowkey.delimiter is required.");
- }
- rowKeyFieldIndexes[index] = Integer.parseInt(cfName);
- } else {
- rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column.
- }
- } else {
- if (cfName.isEmpty()) {
- throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
- }
- if (cfName != null) {
- mappingColumns[index][0] = Bytes.toBytes(cfName);
- }
-
- if (columnName != null && !columnName.isEmpty()) {
- String[] columnNameTokens = columnName.split("#");
- if (columnNameTokens[0].isEmpty()) {
- mappingColumns[index][1] = null;
- } else {
- mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]);
- }
- if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) {
- isBinaryColumns[index] = true;
- }
- }
- }
- } else {
- throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'");
- }
-
- index++;
- } // for loop
- }
-
- public List<String> getColumnFamilyNames() {
- List<String> cfNames = new ArrayList<String>();
-
- for (byte[][] eachCfName: mappingColumns) {
- if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) {
- String cfName = new String(eachCfName[0]);
- if (!cfNames.contains(cfName)) {
- cfNames.add(cfName);
- }
- }
- }
-
- return cfNames;
- }
-
- private RowKeyMapping getRowKeyMapping(String cfName, String columnName) {
- if (columnName == null || columnName.isEmpty()) {
- return null;
- }
-
- String[] tokens = columnName.split("#");
- if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) {
- return null;
- }
-
- RowKeyMapping rowKeyMapping = new RowKeyMapping();
-
- if (tokens.length == 2 && "b".equals(tokens[1])) {
- rowKeyMapping.setBinary(true);
- }
-
- if (cfName != null && !cfName.isEmpty()) {
- rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName));
- }
- return rowKeyMapping;
- }
-
- public char getRowKeyDelimiter() {
- return rowKeyDelimiter;
- }
-
- public int[] getRowKeyFieldIndexes() {
- return rowKeyFieldIndexes;
- }
-
- public boolean[] getIsRowKeyMappings() {
- return isRowKeyMappings;
- }
-
- public byte[][][] getMappingColumns() {
- return mappingColumns;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public boolean[] getIsBinaryColumns() {
- return isBinaryColumns;
- }
-
- public String getHbaseTableName() {
- return hbaseTableName;
- }
-
- public boolean[] getIsColumnKeys() {
- return isColumnKeys;
- }
-
- public int getNumRowKeys() {
- return numRowKeys;
- }
-
- public boolean[] getIsColumnValues() {
- return isColumnValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
deleted file mode 100644
index c05c5bb..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-
-public class HBaseBinarySerializerDeserializer {
-
- public static Datum deserialize(Column col, byte[] bytes) throws IOException {
- Datum datum;
- switch (col.getDataType().getType()) {
- case INT1:
- case INT2:
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes));
- break;
- case INT4:
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes));
- break;
- case INT8:
- if (bytes.length == 4) {
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes));
- } else {
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes));
- }
- break;
- case FLOAT4:
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes));
- break;
- case FLOAT8:
- datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes));
- break;
- case TEXT:
- datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
- break;
- default:
- datum = NullDatum.get();
- break;
- }
- return datum;
- }
-
- public static byte[] serialize(Column col, Datum datum) throws IOException {
- if (datum == null || datum instanceof NullDatum) {
- return null;
- }
-
- byte[] bytes;
- switch (col.getDataType().getType()) {
- case INT1:
- case INT2:
- bytes = Bytes.toBytes(datum.asInt2());
- break;
- case INT4:
- bytes = Bytes.toBytes(datum.asInt4());
- break;
- case INT8:
- bytes = Bytes.toBytes(datum.asInt8());
- break;
- case FLOAT4:
- bytes = Bytes.toBytes(datum.asFloat4());
- break;
- case FLOAT8:
- bytes = Bytes.toBytes(datum.asFloat8());
- break;
- case TEXT:
- bytes = Bytes.toBytes(datum.asChars());
- break;
- default:
- bytes = null;
- break;
- }
-
- return bytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
deleted file mode 100644
index 43ad7f3..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.StorageFragmentProtos.HBaseFragmentProto;
-
-public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
- @Expose
- private String tableName;
- @Expose
- private String hbaseTableName;
- @Expose
- private byte[] startRow;
- @Expose
- private byte[] stopRow;
- @Expose
- private String regionLocation;
- @Expose
- private boolean last;
- @Expose
- private long length;
-
- public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) {
- this.tableName = tableName;
- this.hbaseTableName = hbaseTableName;
- this.startRow = startRow;
- this.stopRow = stopRow;
- this.regionLocation = regionLocation;
- this.last = false;
- }
-
- public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
-
- private void init(HBaseFragmentProto proto) {
- this.tableName = proto.getTableName();
- this.hbaseTableName = proto.getHbaseTableName();
- this.startRow = proto.getStartRow().toByteArray();
- this.stopRow = proto.getStopRow().toByteArray();
- this.regionLocation = proto.getRegionLocation();
- this.length = proto.getLength();
- this.last = proto.getLast();
- }
-
- @Override
- public int compareTo(HBaseFragment t) {
- return Bytes.compareTo(startRow, t.startRow);
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public String getKey() {
- return new String(startRow);
- }
-
- @Override
- public boolean isEmpty() {
- return startRow == null || stopRow == null;
- }
-
- @Override
- public long getLength() {
- return length;
- }
-
- public void setLength(long length) {
- this.length = length;
- }
-
- @Override
- public String[] getHosts() {
- return new String[] {regionLocation};
- }
-
- public Object clone() throws CloneNotSupportedException {
- HBaseFragment frag = (HBaseFragment) super.clone();
- frag.tableName = tableName;
- frag.hbaseTableName = hbaseTableName;
- frag.startRow = startRow;
- frag.stopRow = stopRow;
- frag.regionLocation = regionLocation;
- frag.last = last;
- frag.length = length;
- return frag;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof HBaseFragment) {
- HBaseFragment t = (HBaseFragment) o;
- if (tableName.equals(t.tableName)
- && Bytes.equals(startRow, t.startRow)
- && Bytes.equals(stopRow, t.stopRow)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
- }
-
- @Override
- public String toString() {
- return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
- ", \"startRow\": \"" + new String(startRow) + "\"" +
- ", \"stopRow\": \"" + new String(stopRow) + "\"" +
- ", \"length\": \"" + length + "\"}" ;
- }
-
- @Override
- public FragmentProto getProto() {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder.setTableName(tableName)
- .setHbaseTableName(hbaseTableName)
- .setStartRow(ByteString.copyFrom(startRow))
- .setStopRow(ByteString.copyFrom(stopRow))
- .setLast(last)
- .setLength(length)
- .setRegionLocation(regionLocation);
-
- FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
- fragmentBuilder.setId(this.tableName);
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- fragmentBuilder.setStoreType(StoreType.HBASE.name());
- return fragmentBuilder.build();
- }
-
- public byte[] getStartRow() {
- return startRow;
- }
-
- public byte[] getStopRow() {
- return stopRow;
- }
-
- public String getRegionLocation() {
- return regionLocation;
- }
-
- public boolean isLast() {
- return last;
- }
-
- public void setLast(boolean last) {
- this.last = last;
- }
-
- public String getHbaseTableName() {
- return hbaseTableName;
- }
-
- public void setHbaseTableName(String hbaseTableName) {
- this.hbaseTableName = hbaseTableName;
- }
-
- public void setStartRow(byte[] startRow) {
- this.startRow = startRow;
- }
-
- public void setStopRow(byte[] stopRow) {
- this.stopRow = stopRow;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
deleted file mode 100644
index 50f61a8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public class HBasePutAppender extends AbstractHBaseAppender {
- private HTableInterface htable;
- private long totalNumBytes;
-
- public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
- Schema schema, TableMeta meta, Path stagingDir) {
- super(conf, taskAttemptId, schema, meta, stagingDir);
- }
-
- @Override
- public void init() throws IOException {
- super.init();
-
- Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
- HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE))
- .getConnection(hbaseConf);
- htable = hconn.getTable(columnMapping.getHbaseTableName());
- htable.setAutoFlushTo(false);
- htable.setWriteBufferSize(5 * 1024 * 1024);
- }
-
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- byte[] rowkey = getRowKeyBytes(tuple);
- totalNumBytes += rowkey.length;
- Put put = new Put(rowkey);
- readKeyValues(tuple, rowkey);
-
- for (int i = 0; i < columnNum; i++) {
- if (isRowKeyMappings[i]) {
- continue;
- }
- Datum datum = tuple.get(i);
- byte[] value;
- if (isBinaryColumns[i]) {
- value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
- } else {
- value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
- }
-
- if (isColumnKeys[i]) {
- columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
- } else if (isColumnValues[i]) {
- columnValueDatas[columnKeyValueDataIndexes[i]] = value;
- } else {
- put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
- totalNumBytes += value.length;
- }
- }
-
- for (int i = 0; i < columnKeyDatas.length; i++) {
- put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
- totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length;
- }
-
- htable.put(put);
-
- if (enabledStats) {
- stats.incrementRow();
- stats.setNumBytes(totalNumBytes);
- }
- }
-
- @Override
- public void flush() throws IOException {
- htable.flushCommits();
- }
-
- @Override
- public long getEstimatedOutputSize() throws IOException {
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- if (htable != null) {
- htable.flushCommits();
- htable.close();
- }
- if (enabledStats) {
- stats.setNumBytes(totalNumBytes);
- }
- }
-}