You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:18 UTC

[14/29] tajo git commit: TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.

TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.

Closes #277


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f69938ab
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f69938ab
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f69938ab

Branch: refs/heads/hbase_storage
Commit: f69938abecd3d53968e318d97aba53d9acd3de40
Parents: 5066ac3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 15:44:05 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 15:44:05 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/storage/StorageConstants.java   |  14 +-
 .../src/main/sphinx/table_management/csv.rst    |   5 +
 .../storage/FieldSerializerDeserializer.java    |   4 +-
 .../tajo/storage/json/JsonLineDeserializer.java |  13 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |   2 +-
 .../tajo/storage/text/DelimitedTextFile.java    |  84 +++++++---
 .../tajo/storage/text/TextLineDeserializer.java |   2 +-
 .../tajo/storage/text/TextLineParsingError.java |  31 ++++
 .../tajo/storage/TestDelimitedTextFile.java     | 164 +++++++++++++++++++
 .../testErrorTolerance1.json                    |   6 +
 .../testErrorTolerance2.json                    |   4 +
 12 files changed, 303 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c03b72b..c26b8a9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,9 @@ Release 0.9.1 - unreleased
 
   NEW FEATURES
 
+    TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.
+    (hyunsik)
+
     TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim)
 
     TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index a3d8de0..459c9c9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -32,8 +32,20 @@ public class StorageConstants {
 
   public static final String TEXT_DELIMITER = "text.delimiter";
   public static final String TEXT_NULL = "text.null";
-  public static final String TEXT_SERDE_CLASS = "text.serde.class";
+  public static final String TEXT_SERDE_CLASS = "text.serde";
   public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe";
+  /**
+   * It's the maximum number of parsing error torrence.
+   *
+   * <ul>
+   *   <li>If it is -1, it is always torrent against any parsing error.</li>
+   *   <li>If it is 0, it does not permit any parsing error.</li>
+   *   <li>If it is some positive integer (i.e., > 0), the given number of parsing errors in each
+   *       task will be permissible</li>
+   * </ul>
+   **/
+  public static final String TEXT_ERROR_TOLERANCE_MAXNUM = "text.error-tolerance.max-num";
+  public static final String DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM = "0";
 
   @Deprecated
   public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-docs/src/main/sphinx/table_management/csv.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/csv.rst b/tajo-docs/src/main/sphinx/table_management/csv.rst
index 3aba2ba..71313d6 100644
--- a/tajo-docs/src/main/sphinx/table_management/csv.rst
+++ b/tajo-docs/src/main/sphinx/table_management/csv.rst
@@ -40,6 +40,11 @@ Now, the CSV storage format provides the following physical properties.
 * ``text.null``: NULL character. The default NULL character is an empty string ``''``. Hive's default NULL character is ``'\\N'``.
 * ``compression.codec``: Compression codec. You can enable compression feature and set specified compression algorithm. The compression algorithm used to compress files. The compression codec name should be the fully qualified class name inherited from `org.apache.hadoop.io.compress.CompressionCodec <https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/compress/CompressionCodec.html>`_. By default, compression is disabled.
 * ``csvfile.serde``: custom (De)serializer class. ``org.apache.tajo.storage.TextSerializerDeserializer`` is the default (De)serializer class.
+* ``text.error-tolerance.max-num``: the maximum number of permissible parsing errors. This value should be an integer value. By default, ``text.error-tolerance.max-num`` is ``0``. According to the value, parsing errors will be handled in different ways.
+
+  * If ``text.error-tolerance.max-num < 0``, all parsing errors are ignored.
+  * If ``text.error-tolerance.max-num == 0``, any parsing error is not allowed. If any error occurs, the query will be failed. (default)
+  * If ``text.error-tolerance.max-num > 0``, the given number of parsing errors in each task will be pemissible.
 
 The following example is to set a custom field delimiter, NULL character, and compression codec:
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
index 7df4584..0b3755d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import io.netty.buffer.ByteBuf;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.text.TextLineParsingError;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -30,6 +31,7 @@ public interface FieldSerializerDeserializer {
 
   public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
 
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException;
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars)
+      throws IOException, TextLineParsingError;
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
index 37cd9f3..dfe36f6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
 import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
@@ -30,9 +31,11 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -55,7 +58,7 @@ public class JsonLineDeserializer extends TextLineDeserializer {
   }
 
   @Override
-  public void deserialize(ByteBuf buf, Tuple output) throws IOException {
+  public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError {
     byte [] line = new byte[buf.readableBytes()];
     buf.readBytes(line);
 
@@ -170,8 +173,9 @@ public class JsonLineDeserializer extends TextLineDeserializer {
           if (jsonObject == null) {
             output.put(actualIdx, NullDatum.get());
             break;
-          } if (jsonObject instanceof String) {
-            output.put(actualIdx, DatumFactory.createBlob((String)jsonObject));
+          }
+          if (jsonObject instanceof String) {
+            output.put(actualIdx, DatumFactory.createBlob((String) jsonObject));
           } else if (jsonObject instanceof JSONArray) {
             JSONArray jsonArray = (JSONArray) jsonObject;
             byte[] bytes = new byte[jsonArray.size()];
@@ -208,7 +212,8 @@ public class JsonLineDeserializer extends TextLineDeserializer {
           throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
         }
       }
-
+    } catch (ParseException pe) {
+      throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe);
     } catch (Throwable e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 0e2dfb0..f2eebc6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -48,7 +48,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     fieldSerDer = new TextFieldSerializerDeserializer();
   }
 
-  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException {
+  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
     int[] projection = targetColumnIndexes;
     if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
       return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 2218fae..c54131b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -48,6 +48,9 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
+import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
+
 public class DelimitedTextFile {
 
   public static final byte LF = '\n';
@@ -267,12 +270,19 @@ public class DelimitedTextFile {
     private final long startOffset;
 
     private final long endOffset;
+    /** The number of actual read records */
     private int recordCount = 0;
     private int[] targetColumnIndexes;
 
     private DelimitedLineReader reader;
     private TextLineDeserializer deserializer;
 
+    private int errorPrintOutMaxNum = 5;
+    /** Maximum number of permissible errors */
+    private int errorTorrenceMaxNum;
+    /** How many errors have occurred? */
+    private int errorNum;
+
     public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
                                     final FileFragment fragment)
         throws IOException {
@@ -284,10 +294,9 @@ public class DelimitedTextFile {
 
       startOffset = fragment.getStartKey();
       endOffset = startOffset + fragment.getEndKey();
-    }
 
-    public TextLineSerDe getLineSerde() {
-      return DelimitedTextFile.getLineSerde(meta);
+      errorTorrenceMaxNum =
+          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
     }
 
     @Override
@@ -295,6 +304,7 @@ public class DelimitedTextFile {
       if (reader != null) {
         reader.close();
       }
+
       reader = new DelimitedLineReader(conf, fragment);
       reader.init();
       recordCount = 0;
@@ -322,15 +332,8 @@ public class DelimitedTextFile {
       deserializer.init();
     }
 
-    public ByteBuf readLine() throws IOException {
-      ByteBuf buf = reader.readLine();
-      if (buf == null) {
-        return null;
-      } else {
-        recordCount++;
-      }
-
-      return buf;
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
     }
 
     @Override
@@ -355,21 +358,60 @@ public class DelimitedTextFile {
 
     @Override
     public Tuple next() throws IOException {
+
+      if (!reader.isReadable()) {
+        return null;
+      }
+
+      if (targets.length == 0) {
+        return EmptyTuple.get();
+      }
+
+      VTuple tuple = new VTuple(schema.size());
+
       try {
-        if (!reader.isReadable()) return null;
 
-        ByteBuf buf = readLine();
-        if (buf == null) return null;
+        // this loop will continue until one tuple is build or EOS (end of stream).
+        do {
 
-        if (targets.length == 0) {
-          return EmptyTuple.get();
-        }
+          ByteBuf buf = reader.readLine();
+          if (buf == null) {
+            return null;
+          }
+
+          try {
+
+            deserializer.deserialize(buf, tuple);
+            // if a line is read normaly, it exists this loop.
+            break;
+
+          } catch (TextLineParsingError tae) {
+
+            errorNum++;
+
+            // suppress too many log prints, which probably cause performance degradation
+            if (errorNum < errorPrintOutMaxNum) {
+              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
+            }
+
+            // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0),
+            // it checks if the number of parsing error exceeds the max limit.
+            // Otherwise, it will ignore all parsing errors.
+            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
+              throw tae;
+            }
+            continue;
+          }
+
+        } while (reader.isReadable()); // continue until EOS
+
+        // recordCount means the number of actual read records. We increment the count here.
+        recordCount++;
 
-        VTuple tuple = new VTuple(schema.size());
-        deserializer.deserialize(buf, tuple);
         return tuple;
+
       } catch (Throwable t) {
-        LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t);
+        LOG.error(t);
         throw new IOException(t);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index b0d3c3a..7ebfa79 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -51,7 +51,7 @@ public abstract class TextLineDeserializer {
    * @param output Tuple to be filled with read fields
    * @throws java.io.IOException
    */
-  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException;
+  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
 
   /**
    * Release external resources

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
new file mode 100644
index 0000000..f0bae5e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+public class TextLineParsingError extends Exception {
+
+  public TextLineParsingError(Throwable t) {
+    super(t);
+  }
+
+  public TextLineParsingError(String message, Throwable t) {
+    super(t.getMessage() + ", Error line: " + message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
new file mode 100644
index 0000000..93fb12b
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestDelimitedTextFile {
+
+  private static Schema schema = new Schema();
+
+  private static Tuple baseTuple = new VTuple(10);
+
+  static {
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+
+    baseTuple.put(new Datum[] {
+        DatumFactory.createBool(true),                // 0
+        DatumFactory.createChar("hyunsik"),           // 1
+        DatumFactory.createInt2((short) 17),          // 2
+        DatumFactory.createInt4(59),                  // 3
+        DatumFactory.createInt8(23l),                 // 4
+        DatumFactory.createFloat4(77.9f),             // 5
+        DatumFactory.createFloat8(271.9d),            // 6
+        DatumFactory.createText("hyunsik"),           // 7
+        DatumFactory.createBlob("hyunsik".getBytes()),// 8
+        DatumFactory.createInet4("192.168.0.1"),      // 9
+    });
+  }
+
+  public static Path getResourcePath(String path, String suffix) {
+    URL resultBaseURL = ClassLoader.getSystemResource(path);
+    return new Path(resultBaseURL.toString(), suffix);
+  }
+
+  public static Path getResultPath(Class clazz, String fileName) {
+    return new Path (getResourcePath("results", clazz.getSimpleName()), fileName);
+  }
+
+  public static String getResultText(Class clazz, String fileName) throws IOException {
+    FileSystem localFS = FileSystem.getLocal(new Configuration());
+    Path path = getResultPath(clazz, fileName);
+    Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
+    return FileUtil.readTextFile(new File(path.toUri()));
+  }
+
+  private static final FileFragment getFileFragment(String fileName) throws IOException {
+    TajoConf conf = new TajoConf();
+    Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName);
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(tablePath);
+    return new FileFragment("table", tablePath, 0, status.getLen());
+  }
+
+  @Test
+  public void testIgnoreAllErrors() throws IOException {
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
+    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple tuple;
+    int i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(baseTuple, tuple);
+      i++;
+    }
+    assertEquals(3, i);
+    scanner.close();
+  }
+
+  @Test
+  public void testIgnoreOneErrorTolerance() throws IOException {
+
+
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
+    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    assertNotNull(scanner.next());
+    assertNotNull(scanner.next());
+    try {
+      scanner.next();
+    } catch (IOException ioe) {
+      System.out.println(ioe);
+      return;
+    } finally {
+      scanner.close();
+    }
+    fail();
+  }
+
+  @Test
+  public void testNoErrorTolerance() throws IOException {
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
+    FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    try {
+      scanner.next();
+    } catch (IOException ioe) {
+      return;
+    } finally {
+      scanner.close();
+    }
+    fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
new file mode 100644
index 0000000..739dfe7
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
@@ -0,0 +1,6 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
new file mode 100644
index 0000000..8256b72
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
@@ -0,0 +1,4 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file