You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:13 UTC
[09/29] tajo git commit: TAJO-1095: Implement Json file scanner.
TAJO-1095: Implement Json file scanner.
Closes #181
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cd38dffb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cd38dffb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cd38dffb
Branch: refs/heads/hbase_storage
Commit: cd38dffb908a3959472f5ddb705db71d0e48ad89
Parents: 7d41c67
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Nov 28 17:26:38 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Nov 28 17:26:38 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/catalog/CatalogUtil.java | 9 +-
.../org/apache/tajo/catalog/SchemaUtil.java | 8 +
.../src/main/proto/CatalogProtos.proto | 1 +
.../apache/tajo/storage/StorageConstants.java | 2 +-
.../java/org/apache/tajo/storage/VTuple.java | 4 +-
tajo-storage/pom.xml | 6 +
.../tajo/storage/json/JsonLineDeserializer.java | 220 +++++++++++++++++++
.../apache/tajo/storage/json/JsonLineSerDe.java | 37 ++++
.../tajo/storage/json/JsonLineSerializer.java | 134 +++++++++++
.../tajo/storage/text/CSVLineDeserializer.java | 4 +-
.../apache/tajo/storage/text/CSVLineSerDe.java | 4 -
.../tajo/storage/text/CSVLineSerializer.java | 15 +-
.../tajo/storage/text/DelimitedTextFile.java | 8 +-
.../text/TextFieldSerializerDeserializer.java | 2 +-
.../tajo/storage/text/TextLineDeserializer.java | 4 +-
.../src/main/resources/storage-default.xml | 16 +-
.../org/apache/tajo/storage/TestStorages.java | 105 +++++----
.../apache/tajo/storage/json/TestJsonSerDe.java | 101 +++++++++
.../dataset/TestJsonSerDe/testVariousType.json | 1 +
.../src/test/resources/storage-default.xml | 16 +-
.../src/test/resources/testVariousTypes.avsc | 19 +-
22 files changed, 637 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d0677c..025ae88 100644
--- a/CHANGES
+++ b/CHANGES
@@ -11,6 +11,8 @@ Release 0.9.1 - unreleased
TAJO-235: Support Oracle CatalogStore. (Jihun Kang via hyunsik)
+ TAJO-1095: Implement Json file scanner. (hyunsik)
+
IMPROVEMENT
TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 737c9ae..f2d9b9c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -31,9 +31,11 @@ import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.TUtil;
+import org.mortbay.util.ajax.JSON;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -278,13 +280,16 @@ public class CatalogUtil {
return StoreType.AVRO;
} else if (typeStr.equalsIgnoreCase(StoreType.TEXTFILE.name())) {
return StoreType.TEXTFILE;
+ } else if (typeStr.equalsIgnoreCase(StoreType.JSON.name())) {
+ return StoreType.JSON;
} else {
return null;
}
}
public static TableMeta newTableMeta(StoreType type) {
- return new TableMeta(type, new KeyValueSet());
+ KeyValueSet defaultProperties = CatalogUtil.newPhysicalProperties(type);
+ return new TableMeta(type, defaultProperties);
}
public static TableMeta newTableMeta(StoreType type, KeyValueSet options) {
@@ -821,6 +826,8 @@ public class CatalogUtil {
KeyValueSet options = new KeyValueSet();
if (StoreType.CSV == type || StoreType.TEXTFILE == type) {
options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ } else if (StoreType.JSON == type) {
+ options.set(StorageConstants.TEXT_SERDE_CLASS, "org.apache.tajo.storage.json.JsonLineSerDe");
} else if (StoreType.RCFILE == type) {
options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
} else if (StoreType.SEQUENCEFILE == type) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
index ee670ef..23ebe1b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
@@ -100,4 +100,12 @@ public class SchemaUtil {
}
return types;
}
+
+ public static String [] toSimpleNames(Schema schema) {
+ String [] names = new String[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ names[i] = schema.getColumn(i).getSimpleName();
+ }
+ return names;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 99f594a..f29bc6c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -36,6 +36,7 @@ enum StoreType {
SEQUENCEFILE = 8;
AVRO = 9;
TEXTFILE = 10;
+ JSON = 11;
}
enum OrderType {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 3065d31..a3d8de0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -33,7 +33,7 @@ public class StorageConstants {
public static final String TEXT_DELIMITER = "text.delimiter";
public static final String TEXT_NULL = "text.null";
public static final String TEXT_SERDE_CLASS = "text.serde.class";
- public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde";
+ public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe";
@Deprecated
public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 6304734..5e839b7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -56,7 +56,7 @@ public class VTuple implements Tuple, Cloneable {
@Override
public boolean isNull(int fieldid) {
- return values[fieldid].isNull();
+ return values[fieldid] == null || values[fieldid].isNull();
}
@Override
@@ -93,7 +93,7 @@ public class VTuple implements Tuple, Cloneable {
}
public void put(Datum [] values) {
- System.arraycopy(values, 0, this.values, 0, size());
+ System.arraycopy(values, 0, this.values, 0, values.length);
}
//////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index c6877c4..ef26a32 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -72,6 +72,7 @@
<configuration>
<excludes>
<exclude>src/test/resources/testVariousTypes.avsc</exclude>
+ <exclude>src/test/resources/dataset/TestJsonSerDe/*.json</exclude>
</excludes>
</configuration>
</plugin>
@@ -313,6 +314,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ <version>2.0</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
new file mode 100644
index 0000000..37cd9f3
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import io.netty.buffer.ByteBuf;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class JsonLineDeserializer extends TextLineDeserializer {
+ private JSONParser parser;
+ private Type [] types;
+ private String [] columnNames;
+
+ public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ super(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public void init() {
+ types = SchemaUtil.toTypes(schema);
+ columnNames = SchemaUtil.toSimpleNames(schema);
+
+ parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
+ }
+
+ @Override
+ public void deserialize(ByteBuf buf, Tuple output) throws IOException {
+ byte [] line = new byte[buf.readableBytes()];
+ buf.readBytes(line);
+
+ try {
+ JSONObject object = (JSONObject) parser.parse(line);
+
+ for (int i = 0; i < targetColumnIndexes.length; i++) {
+ int actualIdx = targetColumnIndexes[i];
+ String fieldName = columnNames[actualIdx];
+
+ if (!object.containsKey(fieldName)) {
+ output.put(actualIdx, NullDatum.get());
+ continue;
+ }
+
+ switch (types[actualIdx]) {
+ case BOOLEAN:
+ String boolStr = object.getAsString(fieldName);
+ if (boolStr != null) {
+ output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true")));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case CHAR:
+ String charStr = object.getAsString(fieldName);
+ if (charStr != null) {
+ output.put(actualIdx, DatumFactory.createChar(charStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT1:
+ case INT2:
+ Number int2Num = object.getAsNumber(fieldName);
+ if (int2Num != null) {
+ output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT4:
+ Number int4Num = object.getAsNumber(fieldName);
+ if (int4Num != null) {
+ output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT8:
+ Number int8Num = object.getAsNumber(fieldName);
+ if (int8Num != null) {
+ output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case FLOAT4:
+ Number float4Num = object.getAsNumber(fieldName);
+ if (float4Num != null) {
+ output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case FLOAT8:
+ Number float8Num = object.getAsNumber(fieldName);
+ if (float8Num != null) {
+ output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TEXT:
+ String textStr = object.getAsString(fieldName);
+ if (textStr != null) {
+ output.put(actualIdx, DatumFactory.createText(textStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TIMESTAMP:
+ String timestampStr = object.getAsString(fieldName);
+ if (timestampStr != null) {
+ output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TIME:
+ String timeStr = object.getAsString(fieldName);
+ if (timeStr != null) {
+ output.put(actualIdx, DatumFactory.createTime(timeStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case DATE:
+ String dateStr = object.getAsString(fieldName);
+ if (dateStr != null) {
+ output.put(actualIdx, DatumFactory.createDate(dateStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case BIT:
+ case BINARY:
+ case VARBINARY:
+ case BLOB: {
+ Object jsonObject = object.get(fieldName);
+
+ if (jsonObject == null) {
+ output.put(actualIdx, NullDatum.get());
+ break;
+ } if (jsonObject instanceof String) {
+ output.put(actualIdx, DatumFactory.createBlob((String)jsonObject));
+ } else if (jsonObject instanceof JSONArray) {
+ JSONArray jsonArray = (JSONArray) jsonObject;
+ byte[] bytes = new byte[jsonArray.size()];
+ Iterator<Object> it = jsonArray.iterator();
+ int arrayIdx = 0;
+ while (it.hasNext()) {
+ bytes[arrayIdx++] = ((Long) it.next()).byteValue();
+ }
+ if (bytes.length > 0) {
+ output.put(actualIdx, DatumFactory.createBlob(bytes));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ } else {
+ throw new IOException("Unknown json object: " + object.getClass().getSimpleName());
+ }
+ break;
+ }
+ case INET4:
+ String inetStr = object.getAsString(fieldName);
+ if (inetStr != null) {
+ output.put(actualIdx, DatumFactory.createInet4(inetStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+
+ case NULL_TYPE:
+ output.put(actualIdx, NullDatum.get());
+ break;
+
+ default:
+ throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
+ }
+ }
+
+ } catch (Throwable e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void release() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
new file mode 100644
index 0000000..6db2c29
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+public class JsonLineSerDe extends TextLineSerDe {
+ @Override
+ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+ return new JsonLineSerializer(schema, meta);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
new file mode 100644
index 0000000..c7007d8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang.CharSet;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+public class JsonLineSerializer extends TextLineSerializer {
+ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+ private Type [] types;
+ private String [] simpleNames;
+ private int columnNum;
+
+
+ public JsonLineSerializer(Schema schema, TableMeta meta) {
+ super(schema, meta);
+ }
+
+ @Override
+ public void init() {
+ types = SchemaUtil.toTypes(schema);
+ simpleNames = SchemaUtil.toSimpleNames(schema);
+ columnNum = schema.size();
+ }
+
+ @Override
+ public int serialize(OutputStream out, Tuple input) throws IOException {
+ JSONObject jsonObject = new JSONObject();
+
+ for (int i = 0; i < columnNum; i++) {
+ if (input.isNull(i)) {
+ continue;
+ }
+
+ String fieldName = simpleNames[i];
+ Type type = types[i];
+
+ switch (type) {
+
+ case BOOLEAN:
+ jsonObject.put(fieldName, input.getBool(i));
+ break;
+
+ case INT1:
+ case INT2:
+ jsonObject.put(fieldName, input.getInt2(i));
+ break;
+
+ case INT4:
+ jsonObject.put(fieldName, input.getInt4(i));
+ break;
+
+ case INT8:
+ jsonObject.put(fieldName, input.getInt8(i));
+ break;
+
+ case FLOAT4:
+ jsonObject.put(fieldName, input.getFloat4(i));
+ break;
+
+ case FLOAT8:
+ jsonObject.put(fieldName, input.getFloat8(i));
+ break;
+
+ case CHAR:
+ case TEXT:
+ case VARCHAR:
+ case INET4:
+ case TIMESTAMP:
+ case DATE:
+ case TIME:
+ case INTERVAL:
+ jsonObject.put(fieldName, input.getText(i));
+ break;
+
+ case BIT:
+ case BINARY:
+ case BLOB:
+ case VARBINARY:
+ jsonObject.put(fieldName, input.getBytes(i));
+ break;
+
+ case NULL_TYPE:
+ break;
+
+ default:
+ throw new NotImplementedException(types[i].name() + " is not supported.");
+ }
+ }
+
+ String jsonStr = jsonObject.toJSONString();
+ byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET);
+ out.write(jsonBytes);
+ return jsonBytes.length;
+ }
+
+ @Override
+ public void release() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index f580da1..0e2dfb0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -48,7 +48,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
fieldSerDer = new TextFieldSerializerDeserializer();
}
- public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException {
+ public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException {
int[] projection = targetColumnIndexes;
if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
return;
@@ -73,7 +73,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
lineBuf.setIndex(start, start + fieldLength);
Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
- tuple.put(currentIndex, datum);
+ output.put(currentIndex, datum);
currentTarget++;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index e2686a6..2fe7f23 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -24,10 +24,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.storage.StorageConstants;
public class CSVLineSerDe extends TextLineSerDe {
-
- public CSVLineSerDe() {
- }
-
@Override
public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 684519c..7397000 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage.text;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
@@ -32,6 +33,7 @@ public class CSVLineSerializer extends TextLineSerializer {
private byte [] nullChars;
private char delimiter;
+ private int columnNum;
public CSVLineSerializer(Schema schema, TableMeta meta) {
super(schema, meta);
@@ -41,25 +43,26 @@ public class CSVLineSerializer extends TextLineSerializer {
public void init() {
nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+ columnNum = schema.size();
serde = new TextFieldSerializerDeserializer();
}
@Override
public int serialize(OutputStream out, Tuple input) throws IOException {
- int rowBytes = 0;
+ int writtenBytes = 0;
- for (int i = 0; i < schema.size(); i++) {
+ for (int i = 0; i < columnNum; i++) {
Datum datum = input.get(i);
- rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
+ writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
- if (schema.size() - 1 > i) {
+ if (columnNum - 1 > i) {
out.write((byte) delimiter);
- rowBytes += 1;
+ writtenBytes += 1;
}
}
- return rowBytes;
+ return writtenBytes;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index d15f394..2218fae 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -79,12 +79,12 @@ public class DelimitedTextFile {
if (serdeClassCache.containsKey(serDeClassName)) {
serdeClass = serdeClassCache.get(serDeClassName);
} else {
- serdeClass = (Class<? extends TextLineSerDe>) Class.forName(CSVLineSerDe.class.getName());
+ serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
serdeClassCache.put(serDeClassName, serdeClass);
}
lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
} catch (Throwable e) {
- throw new RuntimeException("TextLineSerde class cannot be initialized");
+ throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
}
return lineSerder;
@@ -382,7 +382,9 @@ public class DelimitedTextFile {
@Override
public void close() throws IOException {
try {
- deserializer.release();
+ if (deserializer != null) {
+ deserializer.release();
+ }
if (tableStats != null && reader != null) {
tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 9722959..95d0407 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -38,7 +38,7 @@ import java.nio.charset.CharsetDecoder;
public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
public static final byte[] trueBytes = "true".getBytes();
public static final byte[] falseBytes = "false".getBytes();
- private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 645d118..b0d3c3a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -48,10 +48,10 @@ public abstract class TextLineDeserializer {
* It fills a tuple with a read fields in a given line.
*
* @param buf Read line
- * @param tuple Tuple to be filled with read fields
+ * @param output Tuple to be filled with read fields
* @throws java.io.IOException
*/
- public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException;
+ public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException;
/**
* Release external resources
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index 064f250..e861b7d 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -35,7 +35,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
</property>
<!--- Fragment Class Configurations -->
@@ -48,6 +48,10 @@
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
+ <name>tajo.storage.fragment.json.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
<name>tajo.storage.fragment.raw.class</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
@@ -84,6 +88,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
@@ -130,6 +139,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
</property>
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 6e2bc35..c581926 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -80,18 +80,17 @@ public class TestStorages {
" \"name\": \"testNullHandlingTypes\",\n" +
" \"fields\": [\n" +
" { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
- " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
" { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
- " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
- " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
- " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
+ " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
+ " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
+ " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
+ " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
" { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col12\", \"type\": \"null\" },\n" +
- " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
+ " { \"name\": \"col11\", \"type\": \"null\" },\n" +
+ " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
" ]\n" +
"}\n";
@@ -129,6 +128,7 @@ public class TestStorages {
{StoreType.SEQUENCEFILE, true, true, false},
{StoreType.AVRO, false, false, false},
{StoreType.TEXTFILE, true, true, false},
+ {StoreType.JSON, true, true, false},
});
}
@@ -298,20 +298,23 @@ public class TestStorages {
@Test
public void testVariousTypes() throws IOException {
+ boolean handleProtobuf = storeType != StoreType.JSON;
+
Schema schema = new Schema();
schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+ if (handleProtobuf) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
@@ -328,10 +331,9 @@ public class TestStorages {
QueryId queryid = new QueryId("12345", 5);
ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
- Tuple tuple = new VTuple(13);
+ Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
tuple.put(new Datum[] {
DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
DatumFactory.createChar("hyunsik"),
DatumFactory.createInt2((short) 17),
DatumFactory.createInt4(59),
@@ -341,9 +343,12 @@ public class TestStorages {
DatumFactory.createText("hyunsik"),
DatumFactory.createBlob("hyunsik".getBytes()),
DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
+ NullDatum.get()
});
+ if (handleProtobuf) {
+ tuple.put(11, factory.createDatum(queryid.getProto()));
+ }
+
appender.addTuple(tuple);
appender.flush();
appender.close();
@@ -364,20 +369,24 @@ public class TestStorages {
@Test
public void testNullHandlingTypes() throws IOException {
+ boolean handleProtobuf = storeType != StoreType.JSON;
+
Schema schema = new Schema();
schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+
+ if (handleProtobuf) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
@@ -397,11 +406,10 @@ public class TestStorages {
QueryId queryid = new QueryId("12345", 5);
ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple seedTuple = new VTuple(13);
+ int columnNum = 11 + (handleProtobuf ? 1 : 0);
+ Tuple seedTuple = new VTuple(columnNum);
seedTuple.put(new Datum[]{
DatumFactory.createBool(true), // 0
- DatumFactory.createBit((byte) 0x99), // 1
DatumFactory.createChar("hyunsik"), // 2
DatumFactory.createInt2((short) 17), // 3
DatumFactory.createInt4(59), // 4
@@ -412,14 +420,17 @@ public class TestStorages {
DatumFactory.createBlob("hyunsik".getBytes()),// 9
DatumFactory.createInet4("192.168.0.1"), // 10
NullDatum.get(), // 11
- factory.createDatum(queryid.getProto()) // 12
});
+ if (handleProtobuf) {
+ seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
+ }
+
// Making tuples with different null column positions
Tuple tuple;
- for (int i = 0; i < 13; i++) {
- tuple = new VTuple(13);
- for (int j = 0; j < 13; j++) {
+ for (int i = 0; i < columnNum; i++) {
+ tuple = new VTuple(columnNum);
+ for (int j = 0; j < columnNum; j++) {
if (i == j) { // i'th column will have NULL value
tuple.put(j, NullDatum.get());
} else {
@@ -439,8 +450,8 @@ public class TestStorages {
Tuple retrieved;
int i = 0;
while ((retrieved = scanner.next()) != null) {
- assertEquals(13, retrieved.size());
- for (int j = 0; j < 13; j++) {
+ assertEquals(columnNum, retrieved.size());
+ for (int j = 0; j < columnNum; j++) {
if (i == j) {
assertEquals(NullDatum.get(), retrieved.get(j));
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
new file mode 100644
index 0000000..038bc17
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestJsonSerDe {
+ private static Schema schema = new Schema();
+
+ static {
+ schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+ schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7);
+ schema.addColumn("col3", TajoDataTypes.Type.INT2);
+ schema.addColumn("col4", TajoDataTypes.Type.INT4);
+ schema.addColumn("col5", TajoDataTypes.Type.INT8);
+ schema.addColumn("col6", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col7", TajoDataTypes.Type.FLOAT8);
+ schema.addColumn("col8", TajoDataTypes.Type.TEXT);
+ schema.addColumn("col9", TajoDataTypes.Type.BLOB);
+ schema.addColumn("col10", TajoDataTypes.Type.INET4);
+ schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE);
+ }
+
+ public static Path getResourcePath(String path, String suffix) {
+ URL resultBaseURL = ClassLoader.getSystemResource(path);
+ return new Path(resultBaseURL.toString(), suffix);
+ }
+
+ @Test
+ public void testVarioutType() throws IOException {
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+ Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json");
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ assertNotNull(tuple);
+ assertNull(scanner.next());
+ scanner.close();
+
+ Tuple baseTuple = new VTuple(11);
+ baseTuple.put(new Datum[] {
+ DatumFactory.createBool(true), // 0
+ DatumFactory.createChar("hyunsik"), // 1
+ DatumFactory.createInt2((short) 17), // 2
+ DatumFactory.createInt4(59), // 3
+ DatumFactory.createInt8(23l), // 4
+ DatumFactory.createFloat4(77.9f), // 5
+ DatumFactory.createFloat8(271.9d), // 6
+ DatumFactory.createText("hyunsik"), // 7
+ DatumFactory.createBlob("hyunsik".getBytes()), // 8
+ DatumFactory.createInet4("192.168.0.1"), // 9
+ NullDatum.get(), // 10
+ });
+
+ assertEquals(baseTuple, tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
new file mode 100644
index 0000000..8ee3408
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@ -0,0 +1 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index 790d5a8..f4c81c7 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -28,7 +28,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
</property>
<!--- Fragment Class Configurations -->
@@ -41,6 +41,10 @@
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
+ <name>tajo.storage.fragment.json.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
<name>tajo.storage.fragment.raw.class</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
@@ -77,6 +81,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
@@ -123,6 +132,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.json.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
</property>
http://git-wip-us.apache.org/repos/asf/tajo/blob/cd38dffb/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
index 611b97f..d4250a9 100644
--- a/tajo-storage/src/test/resources/testVariousTypes.avsc
+++ b/tajo-storage/src/test/resources/testVariousTypes.avsc
@@ -4,18 +4,17 @@
"name": "testVariousTypes",
"fields": [
{ "name": "col1", "type": "boolean" },
- { "name": "col2", "type": "int" },
- { "name": "col3", "type": "string" },
+ { "name": "col2", "type": "string" },
+ { "name": "col3", "type": "int" },
{ "name": "col4", "type": "int" },
- { "name": "col5", "type": "int" },
- { "name": "col6", "type": "long" },
- { "name": "col7", "type": "float" },
- { "name": "col8", "type": "double" },
- { "name": "col9", "type": "string" },
+ { "name": "col5", "type": "long" },
+ { "name": "col6", "type": "float" },
+ { "name": "col7", "type": "double" },
+ { "name": "col8", "type": "string" },
+ { "name": "col9", "type": "bytes" },
{ "name": "col10", "type": "bytes" },
- { "name": "col11", "type": "bytes" },
- { "name": "col12", "type": "null" },
- { "name": "col13", "type": "bytes" }
+ { "name": "col11", "type": "null" },
+ { "name": "col12", "type": "bytes" }
]
}