You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/27 18:28:03 UTC
[10/59] [abbrv] jena git commit: Rebrand to Jena Elephas per
community vote
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
new file mode 100644
index 0000000..2464946
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.jsonld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class JsonLDQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new JsonLDQuadReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
new file mode 100644
index 0000000..0e08a4b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.jsonld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class JsonLDTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new JsonLDTripleReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
new file mode 100644
index 0000000..6829c4d
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.BlockedNQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NQuadsInputFormat} or as
+ * whole files with the {@link WholeFileNQuadsInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BlockedNQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
new file mode 100644
index 0000000..802fbea
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NQuads input format
+ *
+ *
+ *
+ */
+public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+ return new NQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
new file mode 100644
index 0000000..128d079
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NQuads input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NQuadsInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WholeFileNQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
new file mode 100644
index 0000000..292167b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NTriplesInputFormat} or as
+ * whole files with the {@link WholeFileNTriplesInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BlockedNTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
new file mode 100644
index 0000000..1694c87
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format
+ *
+ *
+ *
+ */
+public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
new file mode 100644
index 0000000..31c1252
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NTriplesInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WholeFileNTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
new file mode 100644
index 0000000..e5a7940
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.rdfjson;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * RDF/JSON input format
+ *
+ *
+ *
+ */
+public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RdfJsonReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
new file mode 100644
index 0000000..4deb925
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.rdfxml;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * RDF/XML input format
+ *
+ *
+ *
+ */
+public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RdfXmlReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
new file mode 100644
index 0000000..56d031e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.ReaderRIOT;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+import org.apache.jena.riot.lang.PipedRDFStream;
+import org.apache.jena.riot.system.ParserProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation for a record reader that reads records from blocks
+ * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader}
+ * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by
+ * formats which can be split by lines but reduces the overhead by parsing the
+ * split as a whole rather than as individual lines.
+ * <p>
+ * The keys produced are the approximate position in the file at which a tuple
+ * was found and the values will be node tuples. Positions are approximate
+ * because they are recorded after the point at which the most recent tuple was
+ * parsed from the input thus they reflect the approximate position in the
+ * stream immediately after which the triple was found.
+ * </p>
+ *
+ *
+ *
+ * @param <TValue>
+ * Value type
+ * @param <T>
+ * Tuple type
+ */
+public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class);
+ private CompressionCodec compressionCodecs;
+ private TrackableInputStream input;
+ private LongWritable key;
+ private long start, length;
+ private T tuple;
+ private TrackedPipedRDFStream<TValue> stream;
+ private PipedRDFIterator<TValue> iter;
+ private Thread parserThread;
+ private boolean finished = false;
+ private boolean ignoreBadTuples = true;
+ private boolean parserFinished = false;
+ private Throwable parserError = null;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ LOG.debug("initialize({}, {})", genericSplit, context);
+
+ // Assuming file split
+ if (!(genericSplit instanceof FileSplit))
+ throw new IOException("This record reader only supports FileSplit inputs");
+ FileSplit split = (FileSplit) genericSplit;
+
+ // Configuration
+ Configuration config = context.getConfiguration();
+ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
+ if (this.ignoreBadTuples)
+ LOG.warn(
+ "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
+ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
+
+ // Figure out what portion of the file to read
+ start = split.getStart();
+ long end = start + split.getLength();
+ final Path file = split.getPath();
+ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
+ boolean readToEnd = end == totalLength;
+ CompressionCodecFactory factory = new CompressionCodecFactory(config);
+ this.compressionCodecs = factory.getCodec(file);
+
+ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
+
+ // Open the file and prepare the input stream
+ FileSystem fs = file.getFileSystem(config);
+ FSDataInputStream fileIn = fs.open(file);
+ this.length = split.getLength();
+ if (start > 0)
+ fileIn.seek(start);
+
+ if (this.compressionCodecs != null) {
+ // Compressed input
+ // For compressed input NLineInputFormat will have failed to find
+ // any line breaks and will give us a split from 0 -> (length - 1)
+ // Add 1 and re-verify readToEnd so we can abort correctly if ever
+ // given a partial split of a compressed file
+ end++;
+ readToEnd = end == totalLength;
+ if (start > 0 || !readToEnd)
+ throw new IOException("This record reader can only be used with compressed input where the split is a whole file");
+ input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
+ } else {
+ // Uncompressed input
+
+ if (readToEnd) {
+ input = new TrackedInputStream(fileIn);
+ } else {
+ // Need to limit the portion of the file we are reading
+ input = new BlockInputStream(fileIn, split.getLength());
+ }
+ }
+
+ // Set up background thread for parser
+ iter = this.getPipedIterator();
+ this.stream = this.getPipedStream(iter, this.input);
+ ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
+ Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
+ this.parserThread = new Thread(parserRunnable);
+ this.parserThread.setDaemon(true);
+ this.parserThread.start();
+ }
+
+ /**
+ * Gets the RDF iterator to use
+ *
+ * @return Iterator
+ */
+ protected abstract PipedRDFIterator<TValue> getPipedIterator();
+
+ /**
+ * Gets the RDF stream to parse to
+ *
+ * @param iterator
+ * Iterator
+ * @return RDF stream
+ */
+ protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
+
+ /**
+ * Gets the RDF language to use for parsing
+ *
+ * @return
+ */
+ protected abstract Lang getRdfLanguage();
+
+ /**
+ * Creates the runnable upon which the parsing will run
+ *
+ * @param input
+ * Input
+ * @param stream
+ * Stream
+ * @param lang
+ * Language to use for parsing
+ * @return Parser runnable
+ */
+ private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input,
+ final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
+ riotReader.setParserProfile(profile);
+ riotReader.read(input, null, lang.getContentType(), stream, null);
+ //RDFDataMgr.parse(stream, input, null, lang);
+ reader.setParserFinished(null);
+ } catch (Throwable e) {
+ reader.setParserFinished(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Sets the parser thread finished state
+ *
+ * @param e
+ * Error (if any)
+ */
+ private void setParserFinished(Throwable e) {
+ synchronized (this.parserThread) {
+ this.parserError = e;
+ this.parserFinished = true;
+ }
+ }
+
+ /**
+ * Waits for the parser thread to have reported as finished
+ *
+ * @throws InterruptedException
+ */
+ private void waitForParserFinished() throws InterruptedException {
+ do {
+ synchronized (this.parserThread) {
+ if (this.parserFinished)
+ return;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+
+ /**
+ * Creates an instance of a writable tuple from the given tuple value
+ *
+ * @param tuple
+ * Tuple value
+ * @return Writable tuple
+ */
+ protected abstract T createInstance(TValue tuple);
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ // Reuse key for efficiency
+ if (key == null) {
+ key = new LongWritable();
+ }
+
+ if (this.finished)
+ return false;
+
+ try {
+ if (this.iter.hasNext()) {
+ // Position will be relative to the start for the split we're
+ // processing
+ Long l = this.start + this.stream.getPosition();
+ if (l != null) {
+ this.key.set(l);
+ // For compressed input the actual length from which we
+ // calculate progress is likely less than the actual
+ // uncompressed length so we need to increment the
+ // length as we go along
+ // We always add 1 more than the current length because we
+ // don't want to report 100% progress until we really have
+ // finished
+ if (this.compressionCodecs != null && l > this.length)
+ this.length = l + 1;
+ }
+ this.tuple = this.createInstance(this.iter.next());
+ return true;
+ } else {
+ // Need to ensure that the parser thread has finished in order
+ // to determine whether we finished without error
+ this.waitForParserFinished();
+ if (this.parserError != null) {
+ LOG.error("Error parsing block, aborting further parsing", this.parserError);
+ if (!this.ignoreBadTuples)
+ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing",
+ this.parserError);
+ }
+
+ this.key = null;
+ this.tuple = null;
+ this.finished = true;
+ // This is necessary so that when compressed input is used we
+ // report 100% progress once we've reached the genuine end of
+ // the stream
+ if (this.compressionCodecs != null)
+ this.length--;
+ return false;
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (Throwable e) {
+ // Failed to read the tuple on this line
+ LOG.error("Error parsing block, aborting further parsing", e);
+ if (!this.ignoreBadTuples) {
+ this.iter.close();
+ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e);
+ }
+ this.key = null;
+ this.tuple = null;
+ this.finished = true;
+ return false;
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return this.key;
+ }
+
+ @Override
+ public T getCurrentValue() throws IOException, InterruptedException {
+ return this.tuple;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = 0.0f;
+ if (this.key == null) {
+ // We've either not started or we've finished
+ progress = (this.finished ? 1.0f : 0.0f);
+ } else if (this.key.get() == Long.MIN_VALUE) {
+ // We don't have a position so we've either in-progress or finished
+ progress = (this.finished ? 1.0f : 0.5f);
+ } else {
+ // We're some way through the file
+ progress = (this.key.get() - this.start) / (float) this.length;
+ }
+ LOG.debug("getProgress() --> {}", progress);
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.iter.close();
+ this.input.close();
+ this.finished = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
new file mode 100644
index 0000000..2279444
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An abstract record reader for whole file triple formats
+ *
+ *
+ *
+ */
+public abstract class AbstractBlockBasedQuadReader extends AbstractBlockBasedNodeTupleReader<Quad, QuadWritable> {
+
+ @Override
+ protected PipedRDFIterator<Quad> getPipedIterator() {
+ return new PipedRDFIterator<Quad>();
+ }
+
+ @Override
+ protected TrackedPipedRDFStream<Quad> getPipedStream(PipedRDFIterator<Quad> iterator, TrackableInputStream input) {
+ return new TrackedPipedQuadsStream(iterator, input);
+ }
+
+ @Override
+ protected QuadWritable createInstance(Quad tuple) {
+ return new QuadWritable(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
new file mode 100644
index 0000000..2afd329
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * An abstract record reader for whole file triple formats
+ *
+ *
+ *
+ */
+public abstract class AbstractBlockBasedTripleReader extends AbstractBlockBasedNodeTupleReader<Triple, TripleWritable> {
+
+ @Override
+ protected PipedRDFIterator<Triple> getPipedIterator() {
+ return new PipedRDFIterator<Triple>();
+ }
+
+ @Override
+ protected TrackedPipedRDFStream<Triple> getPipedStream(PipedRDFIterator<Triple> iterator, TrackableInputStream input) {
+ return new TrackedPipedTriplesStream(iterator, input);
+ }
+
+ @Override
+ protected TripleWritable createInstance(Triple tuple) {
+ return new TripleWritable(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
new file mode 100644
index 0000000..6c1abe9
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.ParserProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of a record reader that reads records from line
+ * based tuple formats. This only supports reading from file splits currently.
+ * <p>
+ * The keys produced are the position of the line in the file and the values
+ * will be node tuples
+ * </p>
+ *
+ *
+ *
+ * @param <TValue>
+ * @param <T>
+ * Writable tuple type
+ */
+public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start, pos, end, estLength;
+ private int maxLineLength;
+ private LineReader in;
+ private LongWritable key = null;
+ private Text value = null;
+ private T tuple = null;
+ private ParserProfile profile = null;
+ private boolean ignoreBadTuples = true;
+
+ @Override
+ public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ LOG.debug("initialize({}, {})", genericSplit, context);
+
+ // Assuming file split
+ if (!(genericSplit instanceof FileSplit))
+ throw new IOException("This record reader only supports FileSplit inputs");
+ FileSplit split = (FileSplit) genericSplit;
+
+ // Configuration
+ profile = RdfIOUtils.createParserProfile(context, split.getPath());
+ Configuration config = context.getConfiguration();
+ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
+ if (this.ignoreBadTuples)
+ LOG.warn(
+ "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour",
+ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
+
+ // Figure out what portion of the file to read
+ this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
+ compressionCodecs = new CompressionCodecFactory(config);
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
+
+ // Open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(config);
+ FSDataInputStream fileIn = fs.open(file);
+ boolean skipFirstLine = false;
+ if (codec != null) {
+ // Compressed input
+ // For compressed input NLineInputFormat will have failed to find
+ // any line breaks and will give us a split from 0 -> (length - 1)
+ // Add 1 and verify we got complete split
+ if (totalLength > split.getLength() + 1)
+ throw new IOException("This record reader can only be used with compressed input where the split covers the whole file");
+ in = new LineReader(codec.createInputStream(fileIn), config);
+ estLength = end;
+ end = Long.MAX_VALUE;
+ } else {
+ // Uncompressed input
+ if (start != 0) {
+ skipFirstLine = true;
+ --start;
+ fileIn.seek(start);
+ }
+ in = new LineReader(fileIn, config);
+ }
+ // Skip first line and re-establish "start".
+ // This is to do with how line reader reads lines and how
+ // NLineInputFormat will provide the split information to use
+ if (skipFirstLine) {
+ start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
+ }
+ this.pos = start;
+ }
+
+ /**
+ * Gets an iterator over the data on the current line
+ *
+ * @param line
+ * Line
+ * @param profile
+ * Parser profile
+ * @return Iterator
+ */
+ protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile);
+
+ /**
+ * Creates an instance of a writable tuple from the given tuple value
+ *
+ * @param tuple
+ * Tuple value
+ * @return Writable tuple
+ */
+ protected abstract T createInstance(TValue tuple);
+
+ @Override
+ public final boolean nextKeyValue() throws IOException, InterruptedException {
+ // Reuse key for efficiency
+ if (key == null) {
+ key = new LongWritable();
+ }
+
+ // Reset value which we use for reading lines
+ if (value == null) {
+ value = new Text();
+ }
+ tuple = null;
+
+ // Try to read the next valid line
+ int newSize = 0;
+ while (pos < end) {
+ // Read next line
+ newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
+
+ // Once we get an empty line we've reached the end of our input
+ if (newSize == 0) {
+ break;
+ }
+
+ // Update position, remember that where inputs are compressed we may
+ // be at a larger position then we expected because the length of
+ // the split is likely less than the length of the data once
+ // decompressed
+ key.set(pos);
+ pos += newSize;
+ if (pos > estLength)
+ estLength = pos + 1;
+
+ // Skip lines that exceed the line length limit that has been set
+ if (newSize >= maxLineLength) {
+ LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize));
+ continue;
+ }
+
+ // Attempt to read the tuple from current line
+ try {
+ Iterator<TValue> iter = this.getIterator(value.toString(), profile);
+ if (iter.hasNext()) {
+ tuple = this.createInstance(iter.next());
+
+ // If we reach here we've found a valid tuple so we can
+ // break out of the loop
+ break;
+ } else {
+ // Empty line/Comment line
+ LOG.debug("Valid line with no triple at position {}", (pos - newSize));
+ continue;
+ }
+ } catch (Throwable e) {
+ // Failed to read the tuple on this line
+ LOG.error("Bad tuple at position " + (pos - newSize), e);
+ if (this.ignoreBadTuples)
+ continue;
+ throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e);
+ }
+ }
+ boolean result = this.tuple != null;
+
+ // End of input
+ if (newSize == 0) {
+ key = null;
+ value = null;
+ tuple = null;
+ result = false;
+ estLength = pos;
+ }
+ LOG.debug("nextKeyValue() --> {}", result);
+ return result;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ LOG.debug("getCurrentKey() --> {}", key);
+ return key;
+ }
+
+ @Override
+ public T getCurrentValue() throws IOException, InterruptedException {
+ LOG.debug("getCurrentValue() --> {}", tuple);
+ return tuple;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = 0.0f;
+ if (start != end) {
+ if (end == Long.MAX_VALUE) {
+ if (estLength == 0)
+ return 1.0f;
+ // Use estimated length
+ progress = Math.min(1.0f, (pos - start) / (float) (estLength - start));
+ } else {
+ // Use actual length
+ progress = Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
+ LOG.debug("getProgress() --> {}", progress);
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.debug("close()");
+ if (in != null) {
+ in.close();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
new file mode 100644
index 0000000..ac93865
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An abstract reader for line based quad formats
+ *
+ *
+ *
+ */
+public abstract class AbstractLineBasedQuadReader extends AbstractLineBasedNodeTupleReader<Quad, QuadWritable> {
+
+ @Override
+ protected Iterator<Quad> getIterator(String line, ParserProfile profile) {
+ Tokenizer tokenizer = getTokenizer(line);
+ return getQuadsIterator(tokenizer, profile);
+ }
+
+ @Override
+ protected QuadWritable createInstance(Quad q) {
+ return new QuadWritable(q);
+ }
+
+ protected abstract Tokenizer getTokenizer(String line);
+
+ protected abstract Iterator<Quad> getQuadsIterator(Tokenizer tokenizer, ParserProfile profile);
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
new file mode 100644
index 0000000..a0232f5
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * An abstract record reader for line based triple formats
+ *
+ *
+ *
+ */
+public abstract class AbstractLineBasedTripleReader extends AbstractLineBasedNodeTupleReader<Triple, TripleWritable> {
+
+ @Override
+ protected Iterator<Triple> getIterator(String line, ParserProfile profile) {
+ Tokenizer tokenizer = getTokenizer(line);
+ return getTriplesIterator(tokenizer, profile);
+ }
+
+ @Override
+ protected TripleWritable createInstance(Triple t) {
+ return new TripleWritable(t);
+ }
+
+ protected abstract Tokenizer getTokenizer(String line);
+
+ protected abstract Iterator<Triple> getTriplesIterator(Tokenizer tokenizer, ParserProfile profile);
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
new file mode 100644
index 0000000..d0ffed8
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract record reader for arbitrary RDF which provides support for
+ * selecting the actual record reader to use based on detecting the RDF language
+ * from the file name
+ *
+ * @param <TValue>
+ * Tuple type
+ * @param <T>
+ * Writable tuple type
+ */
+public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+ RecordReader<LongWritable, T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
+
+ private RecordReader<LongWritable, T> reader;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ LOG.debug("initialize({}, {})", genericSplit, context);
+
+ // Assuming file split
+ if (!(genericSplit instanceof FileSplit))
+ throw new IOException("This record reader only supports FileSplit inputs");
+
+ // Find RDF language
+ FileSplit split = (FileSplit) genericSplit;
+ Path path = split.getPath();
+ Lang lang = RDFLanguages.filenameToLang(path.getName());
+ if (lang == null)
+ throw new IOException("There is no registered RDF language for the input file " + path.toString());
+
+ // Select the record reader and initialize
+ this.reader = this.selectRecordReader(lang);
+ this.reader.initialize(split, context);
+ }
+
+ /**
+ * Selects the appropriate record reader to use for the given RDF language
+ *
+ * @param lang
+ * RDF language
+ * @return Record reader
+ * @throws IOException
+ * Should be thrown if no record reader can be selected
+ */
+ protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
+
+ @Override
+ public final boolean nextKeyValue() throws IOException, InterruptedException {
+ return this.reader.nextKeyValue();
+ }
+
+ @Override
+ public final LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return this.reader.getCurrentKey();
+ }
+
+ @Override
+ public final T getCurrentValue() throws IOException, InterruptedException {
+ return this.reader.getCurrentValue();
+ }
+
+ @Override
+ public final float getProgress() throws IOException, InterruptedException {
+ return this.reader.getProgress();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ this.reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
new file mode 100644
index 0000000..c2da3f7
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.ReaderRIOT;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+import org.apache.jena.riot.lang.PipedRDFStream;
+import org.apache.jena.riot.system.ParserProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation for a record reader that reads records from whole
+ * files i.e. the whole file must be kept together to allow tuples to be
+ * successfully read. This only supports reading from file splits currently.
+ * <p>
+ * The keys produced are the approximate position in the file at which a tuple
+ * was found and the values will be node tuples. Positions are approximate
+ * because they are recorded after the point at which the most recent tuple was
+ * parsed from the input thus they reflect the approximate position in the
+ * stream immediately after which the triple was found.
+ * </p>
+ * <p>
+ * You should also be aware that with whole file formats syntax compressions in
+ * the format may mean that there are multiple triples produced with the same
+ * position and thus key.
+ * </p>
+ *
+ *
+ *
+ * @param <TValue>
+ * Value type
+ * @param <T>
+ * Tuple type
+ */
+public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
+ private CompressionCodec compressionCodecs;
+ private TrackedInputStream input;
+ private LongWritable key;
+ private long length;
+ private T tuple;
+ private TrackedPipedRDFStream<TValue> stream;
+ private PipedRDFIterator<TValue> iter;
+ private Thread parserThread;
+ private boolean finished = false;
+ private boolean ignoreBadTuples = true;
+ private boolean parserFinished = false;
+ private Throwable parserError = null;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ LOG.debug("initialize({}, {})", genericSplit, context);
+
+ // Assuming file split
+ if (!(genericSplit instanceof FileSplit))
+ throw new IOException("This record reader only supports FileSplit inputs");
+ FileSplit split = (FileSplit) genericSplit;
+
+ // Configuration
+ Configuration config = context.getConfiguration();
+ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
+ if (this.ignoreBadTuples)
+ LOG.warn(
+ "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
+ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
+
+ // Figure out what portion of the file to read
+ if (split.getStart() > 0)
+ throw new IOException("This record reader requires a file split which covers the entire file");
+ final Path file = split.getPath();
+ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
+ CompressionCodecFactory factory = new CompressionCodecFactory(config);
+ this.compressionCodecs = factory.getCodec(file);
+
+ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(),
+ totalLength }));
+
+ if (totalLength > split.getLength())
+ throw new IOException("This record reader requires a file split which covers the entire file");
+
+ // Open the file and prepare the input stream
+ FileSystem fs = file.getFileSystem(config);
+ FSDataInputStream fileIn = fs.open(file);
+ this.length = split.getLength();
+ if (this.compressionCodecs != null) {
+ // Compressed input
+ input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
+ } else {
+ // Uncompressed input
+ input = new TrackedInputStream(fileIn);
+ }
+
+ // Set up background thread for parser
+ iter = this.getPipedIterator();
+ this.stream = this.getPipedStream(iter, this.input);
+ ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
+ Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
+ this.parserThread = new Thread(parserRunnable);
+ this.parserThread.setDaemon(true);
+ this.parserThread.start();
+ }
+
+ /**
+ * Gets the RDF iterator to use
+ *
+ * @return Iterator
+ */
+ protected abstract PipedRDFIterator<TValue> getPipedIterator();
+
+ /**
+ * Gets the RDF stream to parse to
+ *
+ * @param iterator
+ * Iterator
+ * @return RDF stream
+ */
+ protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
+
+ /**
+ * Gets the RDF language to use for parsing
+ *
+ * @return
+ */
+ protected abstract Lang getRdfLanguage();
+
+ /**
+ * Creates the runnable upon which the parsing will run
+ *
+ * @param input
+ * Input
+ * @param stream
+ * Stream
+ * @param lang
+ * Language to use for parsing
+ * @return Parser runnable
+ */
+ private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input,
+ final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
+ riotReader.setParserProfile(profile);
+ riotReader.read(input, null, lang.getContentType(), stream, null);
+ reader.setParserFinished(null);
+ } catch (Throwable e) {
+ reader.setParserFinished(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Sets the parser thread finished state
+ *
+ * @param e
+ * Error (if any)
+ */
+ private void setParserFinished(Throwable e) {
+ synchronized (this.parserThread) {
+ this.parserError = e;
+ this.parserFinished = true;
+ }
+ }
+
+ /**
+ * Waits for the parser thread to have reported as finished
+ *
+ * @throws InterruptedException
+ */
+ private void waitForParserFinished() throws InterruptedException {
+ do {
+ synchronized (this.parserThread) {
+ if (this.parserFinished)
+ return;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+
+ /**
+ * Creates an instance of a writable tuple from the given tuple value
+ *
+ * @param tuple
+ * Tuple value
+ * @return Writable tuple
+ */
+ protected abstract T createInstance(TValue tuple);
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ // Reuse key for efficiency
+ if (key == null) {
+ key = new LongWritable();
+ }
+
+ if (this.finished)
+ return false;
+
+ try {
+ if (this.iter.hasNext()) {
+ Long l = this.stream.getPosition();
+ if (l != null) {
+ this.key.set(l);
+ // For compressed input the actual length from which we
+ // calculate progress is likely less than the actual
+ // uncompressed length so we may need to increment the
+ // length as we go along
+ // We always add 1 more than the current length because we
+ // don't want to report 100% progress until we really have
+ // finished
+ if (this.compressionCodecs != null && l > this.length)
+ this.length = l + 1;
+ }
+ this.tuple = this.createInstance(this.iter.next());
+ return true;
+ } else {
+ // Need to ensure that the parser thread has finished in order
+ // to determine whether we finished without error
+ this.waitForParserFinished();
+ if (this.parserError != null) {
+ LOG.error("Error parsing whole file, aborting further parsing", this.parserError);
+ if (!this.ignoreBadTuples)
+ throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing",
+ this.parserError);
+
+ }
+
+ this.key = null;
+ this.tuple = null;
+ this.finished = true;
+ // This is necessary so that when compressed input is used we
+ // report 100% progress once we've reached the genuine end of
+ // the stream
+ if (this.compressionCodecs != null)
+ this.length--;
+ return false;
+ }
+ } catch (Throwable e) {
+ // Failed to read the tuple on this line
+ LOG.error("Error parsing whole file, aborting further parsing", e);
+ if (!this.ignoreBadTuples) {
+ this.iter.close();
+ throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e);
+ }
+ this.key = null;
+ this.tuple = null;
+ this.finished = true;
+ return false;
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return this.key;
+ }
+
+ @Override
+ public T getCurrentValue() throws IOException, InterruptedException {
+ return this.tuple;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = 0.0f;
+ if (this.key == null) {
+ // We've either not started or we've finished
+ progress = (this.finished ? 1.0f : 0.0f);
+ } else if (this.key.get() == Long.MIN_VALUE) {
+ // We don't have a position so we've either in-progress or finished
+ progress = (this.finished ? 1.0f : 0.5f);
+ } else {
+ // We're some way through the file
+ progress = this.key.get() / (float) this.length;
+ }
+ LOG.debug("getProgress() --> {}", progress);
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.iter.close();
+ this.input.close();
+ this.finished = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
new file mode 100644
index 0000000..e525bea
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An abstract record reader for whole file triple formats
+ *
+ *
+ *
+ */
+public abstract class AbstractWholeFileQuadReader extends AbstractWholeFileNodeTupleReader<Quad, QuadWritable> {
+
+ @Override
+ protected PipedRDFIterator<Quad> getPipedIterator() {
+ return new PipedRDFIterator<Quad>();
+ }
+
+ @Override
+ protected TrackedPipedRDFStream<Quad> getPipedStream(PipedRDFIterator<Quad> iterator, TrackableInputStream input) {
+ return new TrackedPipedQuadsStream(iterator, input);
+ }
+
+ @Override
+ protected QuadWritable createInstance(Quad tuple) {
+ return new QuadWritable(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
new file mode 100644
index 0000000..8710b99
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * An abstract record reader for whole file triple formats
+ *
+ *
+ *
+ */
+public abstract class AbstractWholeFileTripleReader extends AbstractWholeFileNodeTupleReader<Triple, TripleWritable> {
+
+ @Override
+ protected PipedRDFIterator<Triple> getPipedIterator() {
+ return new PipedRDFIterator<Triple>();
+ }
+
+ @Override
+ protected TrackedPipedRDFStream<Triple> getPipedStream(PipedRDFIterator<Triple> iterator, TrackableInputStream input) {
+ return new TrackedPipedTriplesStream(iterator, input);
+ }
+
+ @Override
+ protected TripleWritable createInstance(Triple tuple) {
+ return new TripleWritable(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
new file mode 100644
index 0000000..26b0a8b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A record reader that reads triples from any RDF quads format
+ */
+public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
+
+ @Override
+ protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
+ if (!RDFLanguages.isQuads(lang))
+ throw new IOException(
+ lang.getLabel()
+ + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?");
+
+ // This will throw an appropriate error if the language does not support
+ // triples
+ return HadoopRdfIORegistry.createQuadReader(lang);
+ }
+
+}