You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:58 UTC
[24/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
deleted file mode 100644
index 16c4faa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.SchemaObject;
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Scanner Interface
- */
-public interface Scanner extends SchemaObject, Closeable {
-
- void init() throws IOException;
-
- /**
- * It returns one tuple at each call.
- *
- * @return retrieve null if the scanner has no more tuples.
- * Otherwise it returns one tuple.
- *
- * @throws IOException if internal I/O error occurs during next method
- */
- Tuple next() throws IOException;
-
- /**
- * Reset the cursor. After executed, the scanner
- * will retrieve the first tuple.
- *
- * @throws IOException if internal I/O error occurs during reset method
- */
- void reset() throws IOException;
-
- /**
- * Close scanner
- *
- * @throws IOException if internal I/O error occurs during close method
- */
- void close() throws IOException;
-
-
- /**
- * It returns if the projection is executed in the underlying scanner layer.
- *
- * @return true if this scanner can project the given columns.
- */
- boolean isProjectable();
-
- /**
- * Set target columns
- * @param targets columns to be projected
- */
- void setTarget(Column [] targets);
-
- /**
- * It returns if the selection is executed in the underlying scanner layer.
- *
- * @return true if this scanner can filter tuples against a given condition.
- */
- boolean isSelectable();
-
- /**
- * Set a search condition
- * @param expr to be searched
- *
- * TODO - to be changed Object type
- */
- void setSearchCondition(Object expr);
-
- /**
- * It returns if the file is splittable.
- *
- * @return true if this scanner can split the a file.
- */
- boolean isSplittable();
-
- /**
- * How much of the input has the Scanner consumed
- * @return progress from <code>0.0</code> to <code>1.0</code>.
- */
- float getProgress();
-
- TableStats getInputStats();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
deleted file mode 100644
index 894e7ee..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import java.io.IOException;
-
-public interface SeekableScanner extends Scanner {
-
- public abstract long getNextOffset() throws IOException;
-
- public abstract void seek(long offset) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
deleted file mode 100644
index 564a9f5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-@Deprecated
-public interface SerializerDeserializer {
-
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
-
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
deleted file mode 100644
index 3579674..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SplitLineReader extends LineReader {
- public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
- super(in, recordDelimiterBytes);
- }
-
- public SplitLineReader(InputStream in, Configuration conf,
- byte[] recordDelimiterBytes) throws IOException {
- super(in, conf, recordDelimiterBytes);
- }
-
- public boolean needAdditionalRecordAfterSplit() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
deleted file mode 100644
index cc85c1d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public abstract class Storage {
- protected final Configuration conf;
-
- public Storage(final Configuration conf) {
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return this.conf;
- }
-
- public abstract Appender getAppender(TableMeta meta, Path path)
- throws IOException;
-
- public abstract Scanner openScanner(Schema schema, FileFragment[] tablets)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
deleted file mode 100644
index 220eb6c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ /dev/null
@@ -1,812 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * StorageManager
- */
-public class StorageManager {
- private final Log LOG = LogFactory.getLog(StorageManager.class);
-
- protected final TajoConf conf;
- protected final FileSystem fs;
- protected final Path tableBaseDir;
- protected final boolean blocksMetadataEnabled;
- private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
-
- private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
-
- /**
- * Cache of scanner handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
- /**
- * Cache of appender handlers for each storage type.
- */
- protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
- private StorageManager(TajoConf conf) throws IOException {
- this.conf = conf;
- this.tableBaseDir = TajoConf.getWarehouseDir(conf);
- this.fs = tableBaseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled)
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
-
- public static StorageManager getStorageManager(TajoConf conf) throws IOException {
- return getStorageManager(conf, null);
- }
-
- public static synchronized StorageManager getStorageManager (
- TajoConf conf, Path warehouseDir) throws IOException {
-
- URI uri;
- TajoConf localConf = new TajoConf(conf);
- if (warehouseDir != null) {
- localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
- }
-
- uri = TajoConf.getWarehouseDir(localConf).toUri();
-
- String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-
- if(storageManagers.containsKey(key)) {
- StorageManager sm = storageManagers.get(key);
- return sm;
- } else {
- StorageManager storageManager = new StorageManager(localConf);
- storageManagers.put(key, storageManager);
- return storageManager;
- }
- }
-
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- return getFileScanner(meta, schema, path, status);
- }
-
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
- throws IOException {
- FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
- return getScanner(meta, schema, fragment);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
- return getScanner(meta, schema, fragment, schema);
- }
-
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getWarehouseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
- }
-
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
- public Appender getAppender(TableMeta meta, Schema schema, Path path)
- throws IOException {
- Appender appender;
-
- Class<? extends FileAppender> appenderClass;
-
- String handlerName = meta.getStoreType().name().toLowerCase();
- appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
- if (appenderClass == null) {
- appenderClass = conf.getClass(
- String.format("tajo.storage.appender-handler.%s.class",
- meta.getStoreType().name().toLowerCase()), null,
- FileAppender.class);
- APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
- }
-
- if (appenderClass == null) {
- throw new IOException("Unknown Storage Type: " + meta.getStoreType());
- }
-
- appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
-
- return appender;
- }
-
- public TableMeta getTableMeta(Path tablePath) throws IOException {
- TableMeta meta;
-
- FileSystem fs = tablePath.getFileSystem(conf);
- Path tableMetaPath = new Path(tablePath, ".meta");
- if (!fs.exists(tableMetaPath)) {
- throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
- }
-
- FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
- CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
- CatalogProtos.TableProto.getDefaultInstance());
- meta = new TableMeta(tableProto);
-
- return meta;
- }
-
- public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
- listTablets.add(tablet);
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public FileFragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private FileFragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- totalSize = fs.getContentSummary(tablePath).getLength();
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- public static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path... dirs) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- *
- * @param path the file name to check
- * @param status get the file length
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
- Scanner scanner = getFileScanner(meta, schema, path, status);
- boolean split = scanner.isSplittable();
- scanner.close();
- return split;
- }
-
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
- return new FileFragment(fragmentId, file, start, length);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
- String[] hosts) {
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
- throws IOException {
- return new FileFragment(fragmentId, file, blockLocation);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- }
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- public long getMinSplitSize() {
- return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
- diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (FileFragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
- throws IOException {
- // generate splits'
-
- List<FileFragment> splits = Lists.newArrayList();
- List<FileFragment> volumeSplits = Lists.newArrayList();
- List<BlockLocation> blockLocations = Lists.newArrayList();
-
- for (Path p : inputs) {
- FileSystem fs = p.getFileSystem(conf);
-
- ArrayList<FileStatus> files = Lists.newArrayList();
- if (fs.isFile(p)) {
- files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
- } else {
- files.addAll(listStatus(p));
- }
-
- int previousSplitSize = splits.size();
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- // Get locations of blocks of file
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, schema, path, file);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- blockLocations.addAll(Arrays.asList(blkLocations));
-
- } else { // Non splittable
- long blockSize = blkLocations[0].getLength();
- if (blockSize >= length) {
- blockLocations.addAll(Arrays.asList(blkLocations));
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- } else {
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
-
- } else {
- if (splittable) {
-
- long minSize = Math.max(getMinSplitSize(), 1);
-
- long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
- long splitSize = Math.max(minSize, blockSize);
- long bytesRemaining = length;
-
- // for s3
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining > 0) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, path, 0, length));
- }
- }
- if(LOG.isDebugEnabled()){
- LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
- }
- }
-
- // Combine original fileFragments with new VolumeId information
- setVolumeMeta(volumeSplits, blockLocations);
- splits.addAll(volumeSplits);
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation> blockLocations)
- throws IOException {
-
- int locationSize = blockLocations.size();
- int splitSize = splits.size();
- if (locationSize == 0 || splitSize == 0) return;
-
- if (locationSize != splitSize) {
- // splits and locations don't match up
- LOG.warn("Number of block locations not equal to number of splits: "
- + "#locations=" + locationSize
- + " #splits=" + splitSize);
- return;
- }
-
- DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
- int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
- int blockLocationIdx = 0;
-
- Iterator<FileFragment> iter = splits.iterator();
- while (locationSize > blockLocationIdx) {
-
- int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
- List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
- //BlockStorageLocation containing additional volume location information for each replica of each block.
- BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
-
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
- blockLocationIdx++;
- }
- }
- LOG.info("# of splits with volumeId " + splitSize);
- }
-
- private static class InvalidInputException extends IOException {
- List<IOException> errors;
- public InvalidInputException(List<IOException> errors) {
- this.errors = errors;
- }
-
- @Override
- public String getMessage(){
- StringBuffer sb = new StringBuffer();
- int messageLimit = Math.min(errors.size(), 10);
- for (int i = 0; i < messageLimit ; i ++) {
- sb.append(errors.get(i).getMessage()).append("\n");
- }
-
- if(messageLimit < errors.size())
- sb.append("skipped .....").append("\n");
-
- return sb.toString();
- }
- }
-
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- FileFragment.class
- };
-
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- Path.class
- };
-
- /**
- * create a scanner instance.
- */
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
- Fragment fragment) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
- * create a scanner instance.
- */
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
- Path path) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, schema, meta, path});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
- String handlerName = storeType.name().toLowerCase();
- Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
- if (scannerClass == null) {
- scannerClass = conf.getClass(
- String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class);
- SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
- }
-
- if (scannerClass == null) {
- throw new IOException("Unknown Storage Type: " + storeType.name());
- }
-
- return scannerClass;
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- if (fragment instanceof FileFragment) {
- FileFragment fileFragment = (FileFragment)fragment;
- if (fileFragment.getEndKey() == 0) {
- Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
- scanner.setTarget(target.toArray());
-
- return scanner;
- }
- }
-
- Scanner scanner;
-
- Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
- scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
- if (scanner.isProjectable()) {
- scanner.setTarget(target.toArray());
- }
-
- return scanner;
- }
-
- public static synchronized SeekableScanner getSeekableScanner(
- TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException {
- return (SeekableScanner)getStorageManager(conf, null).getScanner(meta, schema, fragment, target);
- }
-
- public static synchronized SeekableScanner getSeekableScanner(
- TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException {
-
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
-
- return getSeekableScanner(conf, meta, schema, fragment, schema);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
deleted file mode 100644
index f998ebf..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import parquet.hadoop.ParquetOutputFormat;
-import sun.nio.ch.DirectBuffer;
-
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class StorageUtil extends StorageConstants {
- public static int getRowByteSize(Schema schema) {
- int sum = 0;
- for(Column col : schema.getColumns()) {
- sum += StorageUtil.getColByteSize(col);
- }
-
- return sum;
- }
-
- public static int getColByteSize(Column col) {
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- return 1;
- case CHAR:
- return 1;
- case BIT:
- return 1;
- case INT2:
- return 2;
- case INT4:
- return 4;
- case INT8:
- return 8;
- case FLOAT4:
- return 4;
- case FLOAT8:
- return 8;
- case INET4:
- return 4;
- case INET6:
- return 32;
- case TEXT:
- return 256;
- case BLOB:
- return 256;
- case DATE:
- return 4;
- case TIME:
- return 8;
- case TIMESTAMP:
- return 8;
- default:
- return 0;
- }
- }
-
- public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
- FileSystem fs = tableroot.getFileSystem(conf);
- FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
- FileUtil.writeProto(out, meta.getProto());
- out.flush();
- out.close();
- }
-
- public static Path concatPath(String parent, String...childs) {
- return concatPath(new Path(parent), childs);
- }
-
- public static Path concatPath(Path parent, String...childs) {
- StringBuilder sb = new StringBuilder();
-
- for(int i=0; i < childs.length; i++) {
- sb.append(childs[i]);
- if(i < childs.length - 1)
- sb.append("/");
- }
-
- return new Path(parent, sb.toString());
- }
-
- static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
- static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
-
- /**
- * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*".
- *
- * This method finds the maximum sequence number from existing data files through the above patterns.
- * If it cannot find any matched file or the maximum number, it will return -1.
- *
- * @param fs
- * @param path
- * @param recursive
- * @return The maximum sequence number
- * @throws IOException
- */
- public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException {
- if (!fs.isDirectory(path)) {
- return -1;
- }
-
- FileStatus[] files = fs.listStatus(path);
-
- if (files == null || files.length == 0) {
- return -1;
- }
-
- int maxValue = -1;
- List<Path> fileNamePatternMatchedList = new ArrayList<Path>();
-
- for (FileStatus eachFile: files) {
- // In the case of partition table, return largest value within all partition dirs.
- if (eachFile.isDirectory() && recursive) {
- int value = getMaxFileSequence(fs, eachFile.getPath(), recursive);
- if (value > maxValue) {
- maxValue = value;
- }
- } else {
- if (eachFile.getPath().getName().matches(fileNamePatternV08) ||
- eachFile.getPath().getName().matches(fileNamePatternV09)) {
- fileNamePatternMatchedList.add(eachFile.getPath());
- }
- }
- }
-
- if (fileNamePatternMatchedList.isEmpty()) {
- return maxValue;
- }
- Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
- String pathName = lastFile.getName();
-
- // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
- // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
- String[] pathTokens = pathName.split("-");
- if (pathTokens.length == 3) {
- return -1;
- } else if(pathTokens.length == 4) {
- return Integer.parseInt(pathTokens[3]);
- } else {
- return -1;
- }
- }
-
- public static void closeBuffer(ByteBuffer buffer) {
- if (buffer != null) {
- if (buffer.isDirect()) {
- ((DirectBuffer) buffer).cleaner().clean();
- } else {
- buffer.clear();
- }
- }
- }
-
- public static int readFully(InputStream is, byte[] buffer, int offset, int length)
- throws IOException {
- int nread = 0;
- while (nread < length) {
- int nbytes = is.read(buffer, offset + nread, length - nread);
- if (nbytes < 0) {
- return nread > 0 ? nread : nbytes;
- }
- nread += nbytes;
- }
- return nread;
- }
-
- /**
- * Similar to readFully(). Skips bytes in a loop.
- * @param in The DataInput to skip bytes from
- * @param len number of bytes to skip.
- * @throws java.io.IOException if it could not skip requested number of bytes
- * for any reason (including EOF)
- */
- public static void skipFully(DataInput in, int len) throws IOException {
- int amt = len;
- while (amt > 0) {
- long ret = in.skipBytes(amt);
- if (ret == 0) {
- // skip may return 0 even if we're not at EOF. Luckily, we can
- // use the read() method to figure out if we're at the end.
- int b = in.readByte();
- if (b == -1) {
- throw new EOFException( "Premature EOF from inputStream after " +
- "skipping " + (len - amt) + " byte(s).");
- }
- ret = 1;
- }
- amt -= ret;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
deleted file mode 100644
index a2c08de..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-
-/**
- * This class is not thread-safe.
- */
-public class TableStatistics {
- private static final Log LOG = LogFactory.getLog(TableStatistics.class);
- private Schema schema;
- private Tuple minValues;
- private Tuple maxValues;
- private long [] numNulls;
- private long numRows = 0;
- private long numBytes = 0;
-
- private boolean [] comparable;
-
- public TableStatistics(Schema schema) {
- this.schema = schema;
- minValues = new VTuple(schema.size());
- maxValues = new VTuple(schema.size());
-
- numNulls = new long[schema.size()];
- comparable = new boolean[schema.size()];
-
- DataType type;
- for (int i = 0; i < schema.size(); i++) {
- type = schema.getColumn(i).getDataType();
- if (type.getType() == Type.PROTOBUF) {
- comparable[i] = false;
- } else {
- comparable[i] = true;
- }
- }
- }
-
- public Schema getSchema() {
- return this.schema;
- }
-
- public void incrementRow() {
- numRows++;
- }
-
- public long getNumRows() {
- return this.numRows;
- }
-
- public void setNumBytes(long bytes) {
- this.numBytes = bytes;
- }
-
- public long getNumBytes() {
- return this.numBytes;
- }
-
- public void analyzeField(int idx, Datum datum) {
- if (datum instanceof NullDatum) {
- numNulls[idx]++;
- return;
- }
-
- if (comparable[idx]) {
- if (!maxValues.contains(idx) ||
- maxValues.get(idx).compareTo(datum) < 0) {
- maxValues.put(idx, datum);
- }
- if (!minValues.contains(idx) ||
- minValues.get(idx).compareTo(datum) > 0) {
- minValues.put(idx, datum);
- }
- }
- }
-
- public TableStats getTableStat() {
- TableStats stat = new TableStats();
-
- ColumnStats columnStats;
- for (int i = 0; i < schema.size(); i++) {
- columnStats = new ColumnStats(schema.getColumn(i));
- columnStats.setNumNulls(numNulls[i]);
- if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
- columnStats.setMinValue(minValues.get(i));
- } else {
- LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
- ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
- }
- if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
- columnStats.setMaxValue(maxValues.get(i));
- } else {
- LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
- ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
- }
- stat.addColumnStat(columnStats);
- }
-
- stat.setNumRows(this.numRows);
- stat.setNumBytes(this.numBytes);
-
- return stat;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
deleted file mode 100644
index ab8816b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.NumberUtil;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.TimeZone;
-
-// Compatibility with Apache Hive
-@Deprecated
-public class TextSerializerDeserializer implements SerializerDeserializer {
- public static final byte[] trueBytes = "true".getBytes();
- public static final byte[] falseBytes = "false".getBytes();
- private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
-
- @Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
-
- byte[] bytes;
- int length = 0;
- TajoDataTypes.DataType dataType = col.getDataType();
-
- if (datum == null || datum instanceof NullDatum) {
- switch (dataType.getType()) {
- case CHAR:
- case TEXT:
- length = nullCharacters.length;
- out.write(nullCharacters);
- break;
- default:
- break;
- }
- return length;
- }
-
- switch (dataType.getType()) {
- case BOOLEAN:
- out.write(datum.asBool() ? trueBytes : falseBytes);
- length = trueBytes.length;
- break;
- case CHAR:
- byte[] pad = new byte[dataType.getLength() - datum.size()];
- bytes = datum.asTextBytes();
- out.write(bytes);
- out.write(pad);
- length = bytes.length + pad.length;
- break;
- case TEXT:
- case BIT:
- case INT2:
- case INT4:
- case INT8:
- case FLOAT4:
- case FLOAT8:
- case INET4:
- case DATE:
- case INTERVAL:
- bytes = datum.asTextBytes();
- length = bytes.length;
- out.write(bytes);
- break;
- case TIME:
- bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
- length = bytes.length;
- out.write(bytes);
- break;
- case TIMESTAMP:
- bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
- length = bytes.length;
- out.write(bytes);
- break;
- case INET6:
- case BLOB:
- bytes = Base64.encodeBase64(datum.asByteArray(), false);
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobuf = (ProtobufDatum) datum;
- byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
- length = protoBytes.length;
- out.write(protoBytes, 0, protoBytes.length);
- break;
- case NULL_TYPE:
- default:
- break;
- }
- return length;
- }
-
- @Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-
- Datum datum;
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
- break;
- case BIT:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
- break;
- case CHAR:
- datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createChar(new String(bytes, offset, length).trim());
- break;
- case INT1:
- case INT2:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, offset, length));
- break;
- case INT4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, length));
- break;
- case INT8:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt8(new String(bytes, offset, length));
- break;
- case FLOAT4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createFloat4(new String(bytes, offset, length));
- break;
- case FLOAT8:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, length));
- break;
- case TEXT: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
- datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createText(chars);
- break;
- }
- case DATE:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createDate(new String(bytes, offset, length));
- break;
- case TIME:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createTime(new String(bytes, offset, length));
- break;
- case TIMESTAMP:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createTimestamp(new String(bytes, offset, length));
- break;
- case INTERVAL:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInterval(new String(bytes, offset, length));
- break;
- case PROTOBUF: {
- if (isNull(bytes, offset, length, nullCharacters)) {
- datum = NullDatum.get();
- } else {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
- Message.Builder builder = factory.newBuilder();
- try {
- byte[] protoBytes = new byte[length];
- System.arraycopy(bytes, offset, protoBytes, 0, length);
- protobufJsonFormat.merge(protoBytes, builder);
- datum = factory.createDatum(builder.build());
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- break;
- }
- case INET4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInet4(new String(bytes, offset, length));
- break;
- case BLOB: {
- if (isNull(bytes, offset, length, nullCharacters)) {
- datum = NullDatum.get();
- } else {
- byte[] blob = new byte[length];
- System.arraycopy(bytes, offset, blob, 0, length);
- datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
- }
- break;
- }
- default:
- datum = NullDatum.get();
- break;
- }
- return datum;
- }
-
- private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
- return length == 0 || ((length == nullBytes.length)
- && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
- }
-
- private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
- return length > 0 && length == nullBytes.length
- && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
deleted file mode 100644
index 8dffd8d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.common.ProtoObject;
-
-import java.util.Comparator;
-
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
-
- public abstract int compare(Tuple o1, Tuple o2);
-
- public abstract boolean isAscendingFirstKey();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
deleted file mode 100644
index e824b99..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-
-import java.util.Comparator;
-
-/**
- * It represents a pair of start and end tuples.
- */
-public class TupleRange implements Comparable<TupleRange>, Cloneable {
- private Tuple start;
- private Tuple end;
- private final TupleComparator comp;
-
- public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) {
- this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
- // if there is only one value, start == end
- this.start = start;
- this.end = end;
- }
-
- public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
- Schema schema = new Schema();
- for (SortSpec spec : sortSpecs) {
- schema.addColumn(spec.getSortKey());
- }
-
- return schema;
- }
-
- public void setStart(Tuple tuple) {
- this.start = tuple;
- }
-
- public final Tuple getStart() {
- return this.start;
- }
-
- public void setEnd(Tuple tuple) {
- this.end = tuple;
- }
-
- public final Tuple getEnd() {
- return this.end;
- }
-
- public String toString() {
- return "[" + this.start + ", " + this.end + ")";
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(start, end);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof TupleRange) {
- TupleRange other = (TupleRange) obj;
- return this.start.equals(other.start) && this.end.equals(other.end);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(TupleRange o) {
- // TODO - should handle overlap
- int cmpVal = comp.compare(this.start, o.start);
- if (cmpVal != 0) {
- return cmpVal;
- } else {
- return comp.compare(this.end, o.end);
- }
- }
-
- public static class DescendingTupleRangeComparator
- implements Comparator<TupleRange> {
-
- @Override
- public int compare(TupleRange left, TupleRange right) {
- return right.compareTo(left);
- }
- }
-
- public TupleRange clone() throws CloneNotSupportedException {
- TupleRange newRange = (TupleRange) super.clone();
- newRange.setStart(start.clone());
- newRange.setEnd(end.clone());
- return newRange;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
deleted file mode 100644
index ad19101..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.annotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface ForSplitableStore {
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
deleted file mode 100644
index 6af8da0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.FileAppender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * FileAppender for writing to Avro files.
- */
-public class AvroAppender extends FileAppender {
- private TableStatistics stats;
- private Schema avroSchema;
- private List<Schema.Field> avroFields;
- private DataFileWriter<GenericRecord> dataFileWriter;
-
- /**
- * Creates a new AvroAppender.
- *
- * @param conf Configuration properties.
- * @param schema The table schema.
- * @param meta The table metadata.
- * @param path The path of the Parquet file to write to.
- */
- public AvroAppender(Configuration conf,
- org.apache.tajo.catalog.Schema schema,
- TableMeta meta, Path path) throws IOException {
- super(conf, schema, meta, path);
- }
-
- /**
- * Initializes the Appender.
- */
- public void init() throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
- FSDataOutputStream outputStream = fs.create(path);
-
- avroSchema = AvroUtil.getAvroSchema(meta, conf);
- avroFields = avroSchema.getFields();
-
- DatumWriter<GenericRecord> datumWriter =
- new GenericDatumWriter<GenericRecord>(avroSchema);
- dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
- dataFileWriter.create(avroSchema, outputStream);
-
- if (enabledStats) {
- this.stats = new TableStatistics(schema);
- }
- super.init();
- }
-
- /**
- * Gets the current offset. Tracking offsets is currenly not implemented, so
- * this method always returns 0.
- *
- * @return 0
- */
- @Override
- public long getOffset() throws IOException {
- return 0;
- }
-
- private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
- if (tuple.get(i) instanceof NullDatum) {
- return null;
- }
- switch (avroType) {
- case NULL:
- return null;
- case BOOLEAN:
- return tuple.getBool(i);
- case INT:
- return tuple.getInt4(i);
- case LONG:
- return tuple.getInt8(i);
- case FLOAT:
- return tuple.getFloat4(i);
- case DOUBLE:
- return tuple.getFloat8(i);
- case BYTES:
- case FIXED:
- return ByteBuffer.wrap(tuple.getBytes(i));
- case STRING:
- return tuple.getText(i);
- default:
- throw new RuntimeException("Unknown primitive type.");
- }
- }
-
- /**
- * Write a Tuple to the Avro file.
- *
- * @param tuple The Tuple to write.
- */
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- GenericRecord record = new GenericData.Record(avroSchema);
- for (int i = 0; i < schema.size(); ++i) {
- Column column = schema.getColumn(i);
- if (enabledStats) {
- stats.analyzeField(i, tuple.get(i));
- }
- Object value;
- Schema.Field avroField = avroFields.get(i);
- Schema.Type avroType = avroField.schema().getType();
- switch (avroType) {
- case NULL:
- case BOOLEAN:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case BYTES:
- case STRING:
- case FIXED:
- value = getPrimitive(tuple, i, avroType);
- break;
- case RECORD:
- throw new RuntimeException("Avro RECORD not supported.");
- case ENUM:
- throw new RuntimeException("Avro ENUM not supported.");
- case MAP:
- throw new RuntimeException("Avro MAP not supported.");
- case UNION:
- List<Schema> schemas = avroField.schema().getTypes();
- if (schemas.size() != 2) {
- throw new RuntimeException("Avro UNION not supported.");
- }
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- value = getPrimitive(tuple, i, schemas.get(1).getType());
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- value = getPrimitive(tuple, i, schemas.get(0).getType());
- } else {
- throw new RuntimeException("Avro UNION not supported.");
- }
- break;
- default:
- throw new RuntimeException("Unknown type: " + avroType);
- }
- record.put(i, value);
- }
- dataFileWriter.append(record);
-
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- /**
- * Flushes the current state of the file.
- */
- @Override
- public void flush() throws IOException {
- dataFileWriter.flush();
- }
-
- /**
- * Closes the Appender.
- */
- @Override
- public void close() throws IOException {
- dataFileWriter.close();
- }
-
- /**
- * If table statistics is enabled, retrieve the table statistics.
- *
- * @return Table statistics if enabled or null otherwise.
- */
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
deleted file mode 100644
index 816ae25..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * FileScanner for reading Avro files
- */
-public class AvroScanner extends FileScanner {
- private Schema avroSchema;
- private List<Schema.Field> avroFields;
- private DataFileReader<GenericRecord> dataFileReader;
- private int[] projectionMap;
-
- /**
- * Creates a new AvroScanner.
- *
- * @param conf
- * @param schema
- * @param meta
- * @param fragment
- */
- public AvroScanner(Configuration conf,
- final org.apache.tajo.catalog.Schema schema,
- final TableMeta meta, final FileFragment fragment) {
- super(conf, schema, meta, fragment);
- }
-
- /**
- * Initializes the AvroScanner.
- */
- @Override
- public void init() throws IOException {
- if (targets == null) {
- targets = schema.toArray();
- }
- prepareProjection(targets);
-
- avroSchema = AvroUtil.getAvroSchema(meta, conf);
- avroFields = avroSchema.getFields();
-
- DatumReader<GenericRecord> datumReader =
- new GenericDatumReader<GenericRecord>(avroSchema);
- SeekableInput input = new FsInput(fragment.getPath(), conf);
- dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
- super.init();
- }
-
- private void prepareProjection(Column[] targets) {
- projectionMap = new int[targets.length];
- for (int i = 0; i < targets.length; ++i) {
- projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
- }
-
- private static String fromAvroString(Object value) {
- if (value instanceof Utf8) {
- Utf8 utf8 = (Utf8)value;
- return utf8.toString();
- }
- return value.toString();
- }
-
- private static Schema getNonNull(Schema schema) {
- if (!schema.getType().equals(Schema.Type.UNION)) {
- return schema;
- }
- List<Schema> schemas = schema.getTypes();
- if (schemas.size() != 2) {
- return schema;
- }
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return schemas.get(1);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return schemas.get(0);
- } else {
- return schema;
- }
- }
-
- private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
- int intValue = (Integer)value;
- switch (tajoType) {
- case BIT:
- return DatumFactory.createBit((byte)(intValue & 0xff));
- case INT2:
- return DatumFactory.createInt2((short)intValue);
- default:
- return DatumFactory.createInt4(intValue);
- }
- }
-
- private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
- DataType dataType) {
- ByteBuffer buffer = (ByteBuffer)value;
- byte[] bytes = new byte[buffer.capacity()];
- buffer.get(bytes, 0, bytes.length);
- switch (tajoType) {
- case INET4:
- return DatumFactory.createInet4(bytes);
- case PROTOBUF:
- try {
- ProtobufDatumFactory factory =
- ProtobufDatumFactory.get(dataType.getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(bytes);
- return factory.createDatum(builder);
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- default:
- return new BlobDatum(bytes);
- }
- }
-
- private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
- switch (tajoType) {
- case CHAR:
- return DatumFactory.createChar(fromAvroString(value));
- default:
- return DatumFactory.createText(fromAvroString(value));
- }
- }
-
- /**
- * Reads the next Tuple from the Avro file.
- *
- * @return The next Tuple from the Avro file or null if end of file is
- * reached.
- */
- @Override
- public Tuple next() throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- Tuple tuple = new VTuple(schema.size());
- GenericRecord record = dataFileReader.next();
- for (int i = 0; i < projectionMap.length; ++i) {
- int columnIndex = projectionMap[i];
- Object value = record.get(columnIndex);
- if (value == null) {
- tuple.put(columnIndex, NullDatum.get());
- continue;
- }
-
- // Get Avro type.
- Schema.Field avroField = avroFields.get(columnIndex);
- Schema nonNullAvroSchema = getNonNull(avroField.schema());
- Schema.Type avroType = nonNullAvroSchema.getType();
-
- // Get Tajo type.
- Column column = schema.getColumn(columnIndex);
- DataType dataType = column.getDataType();
- TajoDataTypes.Type tajoType = dataType.getType();
- switch (avroType) {
- case NULL:
- tuple.put(columnIndex, NullDatum.get());
- break;
- case BOOLEAN:
- tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
- break;
- case INT:
- tuple.put(columnIndex, convertInt(value, tajoType));
- break;
- case LONG:
- tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
- break;
- case FLOAT:
- tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
- break;
- case DOUBLE:
- tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
- break;
- case BYTES:
- tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
- break;
- case STRING:
- tuple.put(columnIndex, convertString(value, tajoType));
- break;
- case RECORD:
- throw new RuntimeException("Avro RECORD not supported.");
- case ENUM:
- throw new RuntimeException("Avro ENUM not supported.");
- case MAP:
- throw new RuntimeException("Avro MAP not supported.");
- case UNION:
- throw new RuntimeException("Avro UNION not supported.");
- case FIXED:
- tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
- break;
- default:
- throw new RuntimeException("Unknown type.");
- }
- }
- return tuple;
- }
-
- /**
- * Resets the scanner
- */
- @Override
- public void reset() throws IOException {
- }
-
- /**
- * Closes the scanner.
- */
- @Override
- public void close() throws IOException {
- if (dataFileReader != null) {
- dataFileReader.close();
- }
- }
-
- /**
- * Returns whether this scanner is projectable.
- *
- * @return true
- */
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- /**
- * Returns whether this scanner is selectable.
- *
- * @return false
- */
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- /**
- * Returns whether this scanner is splittable.
- *
- * @return false
- */
- @Override
- public boolean isSplittable() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
deleted file mode 100644
index 0d14c3d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.StorageConstants;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-
-public class AvroUtil {
- public static Schema getAvroSchema(TableMeta meta, Configuration conf)
- throws IOException {
-
- boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
- boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
- if (!isSchemaLiteral && !isSchemaUrl) {
- throw new RuntimeException("No Avro schema for table.");
- }
- if (isSchemaLiteral) {
- String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
- return new Schema.Parser().parse(schema);
- }
-
- String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
- if (schemaURL.toLowerCase().startsWith("http")) {
- return getAvroSchemaFromHttp(schemaURL);
- } else {
- return getAvroSchemaFromFileSystem(schemaURL, conf);
- }
- }
-
- public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
- InputStream inputStream = new URL(schemaURL).openStream();
-
- try {
- return new Schema.Parser().parse(inputStream);
- } finally {
- IOUtils.closeStream(inputStream);
- }
- }
-
- public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException {
- Path schemaPath = new Path(schemaURL);
- FileSystem fs = schemaPath.getFileSystem(conf);
- FSDataInputStream inputStream = fs.open(schemaPath);
-
- try {
- return new Schema.Parser().parse(inputStream);
- } finally {
- IOUtils.closeStream(inputStream);
- }
- }
-}