You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:36 UTC
[06/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-hcatalog
[FLINK-6711] Activate strict checkstyle for flink-hcatalog
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88189f2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88189f2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88189f2c
Branch: refs/heads/master
Commit: 88189f2c0c43036270f61bbc8df0cc4fb4d032e1
Parents: 43183ad
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:14:27 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:10 2017 +0200
----------------------------------------------------------------------
flink-connectors/flink-hcatalog/pom.xml | 4 +-
.../flink/hcatalog/HCatInputFormatBase.java | 39 ++++++++++----------
.../flink/hcatalog/java/HCatInputFormat.java | 33 ++++++++---------
.../flink/hcatalog/scala/HCatInputFormat.scala | 2 +-
4 files changed, 39 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88189f2c/flink-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/pom.xml b/flink-connectors/flink-hcatalog/pom.xml
index a9fbceb..10ca36d 100644
--- a/flink-connectors/flink-hcatalog/pom.xml
+++ b/flink-connectors/flink-hcatalog/pom.xml
@@ -19,9 +19,9 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+
<modelVersion>4.0.0</modelVersion>
-
+
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/88189f2c/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
index 859b706..26f2fed 100644
--- a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -18,8 +18,8 @@
package org.apache.flink.hcatalog;
-import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.core.io.InputSplitAssigner;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -57,9 +58,9 @@ import java.util.Map;
* A InputFormat to read from HCatalog tables.
* The InputFormat supports projection (selection and order of fields) and partition filters.
*
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
+ * <p>Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
*
- * Note: Flink tuples might only support a limited number of fields (depending on the API).
+ * <p>Note: Flink tuples might only support a limited number of fields (depending on the API).
*
* @param <T>
*/
@@ -132,7 +133,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
// build output schema
ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
- for(String field : fields) {
+ for (String field : fields) {
fieldSchemas.add(this.outputSchema.get(field));
}
this.outputSchema = new HCatSchema(fieldSchemas);
@@ -164,7 +165,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
* Specifies that the InputFormat returns Flink tuples instead of
* {@link org.apache.hive.hcatalog.data.HCatRecord}.
*
- * Note: Flink tuples might only support a limited number of fields (depending on the API).
+ * <p>Note: Flink tuples might only support a limited number of fields (depending on the API).
*
* @return This InputFormat.
* @throws org.apache.hive.hcatalog.common.HCatException
@@ -173,8 +174,8 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
// build type information
int numFields = outputSchema.getFields().size();
- if(numFields > this.getMaxFlinkTupleSize()) {
- throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
+ if (numFields > this.getMaxFlinkTupleSize()) {
+ throw new IllegalArgumentException("Only up to " + this.getMaxFlinkTupleSize() +
" fields can be returned as Flink tuples.");
}
@@ -225,7 +226,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
case STRUCT:
return new GenericTypeInfo(List.class);
default:
- throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
+ throw new IllegalArgumentException("Unknown data type \"" + fieldSchema.getType() + "\" encountered.");
}
}
@@ -283,7 +284,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
}
HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
- for(int i = 0; i < hadoopInputSplits.length; i++){
+ for (int i = 0; i < hadoopInputSplits.length; i++){
hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
}
return hadoopInputSplits;
@@ -299,7 +300,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
TaskAttemptContext context = null;
try {
context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
@@ -316,7 +317,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
@Override
public boolean reachedEnd() throws IOException {
- if(!this.fetched) {
+ if (!this.fetched) {
fetchNext();
}
return !this.hasNext;
@@ -334,11 +335,11 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
@Override
public T nextRecord(T record) throws IOException {
- if(!this.fetched) {
+ if (!this.fetched) {
// first record
fetchNext();
}
- if(!this.hasNext) {
+ if (!this.hasNext) {
return null;
}
try {
@@ -347,13 +348,13 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
HCatRecord v = this.recordReader.getCurrentValue();
this.fetched = false;
- if(this.fieldNames.length > 0) {
+ if (this.fieldNames.length > 0) {
// return as Flink tuple
return this.buildFlinkTuple(record, v);
} else {
// return as HCatRecord
- return (T)v;
+ return (T) v;
}
} catch (InterruptedException e) {
@@ -374,7 +375,7 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(this.fieldNames.length);
- for(String fieldName : this.fieldNames) {
+ for (String fieldName : this.fieldNames) {
out.writeUTF(fieldName);
}
this.configuration.write(out);
@@ -383,19 +384,19 @@ public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopIn
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.fieldNames = new String[in.readInt()];
- for(int i=0; i<this.fieldNames.length; i++) {
+ for (int i = 0; i < this.fieldNames.length; i++) {
this.fieldNames[i] = in.readUTF();
}
Configuration configuration = new Configuration();
configuration.readFields(in);
- if(this.configuration == null) {
+ if (this.configuration == null) {
this.configuration = configuration;
}
this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
- this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+ this.outputSchema = (HCatSchema) HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88189f2c/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
index 46f3cd5..2520b34 100644
--- a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -18,9 +18,9 @@
package org.apache.flink.hcatalog.java;
-
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.hcatalog.HCatInputFormatBase;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.HCatRecord;
@@ -29,7 +29,7 @@ import org.apache.hive.hcatalog.data.HCatRecord;
* A InputFormat to read from HCatalog tables.
* The InputFormat supports projection (selection and order of fields) and partition filters.
*
- * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * <p>Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
* Flink tuples support only up to 25 fields.
*
* @param <T>
@@ -47,7 +47,6 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
super(database, table, config);
}
-
@Override
protected int getMaxFlinkTupleSize() {
return 25;
@@ -56,10 +55,10 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
@Override
protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
- Tuple tuple = (Tuple)t;
+ Tuple tuple = (Tuple) t;
// Extract all fields from HCatRecord
- for(int i=0; i < this.fieldNames.length; i++) {
+ for (int i = 0; i < this.fieldNames.length; i++) {
// get field value
Object o = record.get(this.fieldNames[i], this.outputSchema);
@@ -69,49 +68,49 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
// need to be converted to original type.
switch(this.outputSchema.get(i).getType()) {
case INT:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Integer.parseInt((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case TINYINT:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Byte.parseByte((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case SMALLINT:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Short.parseShort((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case BIGINT:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Long.parseLong((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case BOOLEAN:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Boolean.parseBoolean((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case FLOAT:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Float.parseFloat((String) o), i);
} else {
tuple.setField(o, i);
}
break;
case DOUBLE:
- if(o instanceof String) {
+ if (o instanceof String) {
tuple.setField(Double.parseDouble((String) o), i);
} else {
tuple.setField(o, i);
@@ -121,28 +120,28 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
tuple.setField(o, i);
break;
case BINARY:
- if(o instanceof String) {
+ if (o instanceof String) {
throw new RuntimeException("Cannot handle partition keys of type BINARY.");
} else {
tuple.setField(o, i);
}
break;
case ARRAY:
- if(o instanceof String) {
+ if (o instanceof String) {
throw new RuntimeException("Cannot handle partition keys of type ARRAY.");
} else {
tuple.setField(o, i);
}
break;
case MAP:
- if(o instanceof String) {
+ if (o instanceof String) {
throw new RuntimeException("Cannot handle partition keys of type MAP.");
} else {
tuple.setField(o, i);
}
break;
case STRUCT:
- if(o instanceof String) {
+ if (o instanceof String) {
throw new RuntimeException("Cannot handle partition keys of type STRUCT.");
} else {
tuple.setField(o, i);
@@ -153,7 +152,7 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
}
}
- return (T)tuple;
+ return (T) tuple;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/88189f2c/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
index 0299ee1..6491815 100644
--- a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -36,7 +36,7 @@ class HCatInputFormat[T](
database: String,
table: String,
config: Configuration
- ) extends HCatInputFormatBase[T](database, table, config) {
+ ) extends HCatInputFormatBase[T](database, table, config) {
def this(database: String, table: String) {
this(database, table, new Configuration)