You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2012/10/31 06:53:44 UTC
svn commit: r1403993 [1/2] - in /gora/branches/goraamazon:
gora-accumulo/src/main/java/org/apache/gora/accumulo/store/
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/
gora-core/src/main/java/org/apache/gora/avro/store/ gora-core/src/main/...
Author: rmarroquin
Date: Wed Oct 31 05:53:43 2012
New Revision: 1403993
URL: http://svn.apache.org/viewvc?rev=1403993&view=rev
Log:
Committing new patch for changes in the way exception were being handled.
Modified:
gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java
gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java
gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
gora/branches/goraamazon/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/branches/goraamazon/gora-sql/src/main/java/org/apache/gora/sql/store/SqlStore.java
Modified: gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (original)
+++ gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Wed Oct 31 05:53:43 2012
@@ -97,6 +97,8 @@ import org.apache.hadoop.io.Text;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
@@ -116,6 +118,8 @@ public class AccumuloStore<K,T extends P
private AuthInfo authInfo;
private Encoder encoder;
+ public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
+
public Object fromBytes(Schema schema, byte data[]) {
return fromBytes(encoder, schema, data);
}
@@ -233,46 +237,51 @@ public class AccumuloStore<K,T extends P
}
@Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws IOException {
- super.initialize(keyClass, persistentClass, properties);
-
- String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
- String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
- String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
- String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
-
- mapping = readMapping(mappingFile);
-
- if (mapping.encoder == null || mapping.encoder.equals("")) {
- encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
- } else {
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+ try{
+ super.initialize(keyClass, persistentClass, properties);
+
+ String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
+ String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+ String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
+ String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
+
+ mapping = readMapping(mappingFile);
+
+ if (mapping.encoder == null || mapping.encoder.equals("")) {
+ encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
+ } else {
+ try {
+ encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
try {
- encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
+ if (mock == null || !mock.equals("true")) {
+ String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
+ String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
+ conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
+ authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
+ } else {
+ conn = new MockInstance().getConnector(user, password);
+ }
+
+ if (autoCreateSchema)
+ createSchema();
+ } catch (AccumuloException e) {
throw new IOException(e);
- } catch (ClassNotFoundException e) {
+ } catch (AccumuloSecurityException e) {
throw new IOException(e);
}
- }
-
- try {
- if (mock == null || !mock.equals("true")) {
- String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
- String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
- conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
- authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
- } else {
- conn = new MockInstance().getConnector(user, password);
- }
-
- if (autoCreateSchema)
- createSchema();
- } catch (AccumuloException e) {
- throw new IOException(e);
- } catch (AccumuloSecurityException e) {
- throw new IOException(e);
+ }catch(IOException e){
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
}
@@ -341,7 +350,7 @@ public class AccumuloStore<K,T extends P
}
@Override
- public void createSchema() throws IOException {
+ public void createSchema() {
try {
conn.tableOperations().create(mapping.tableName);
Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
@@ -350,32 +359,38 @@ public class AccumuloStore<K,T extends P
}
} catch (AccumuloException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
} catch (AccumuloSecurityException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
} catch (TableExistsException e) {
- return;
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
}
@Override
- public void deleteSchema() throws IOException {
+ public void deleteSchema() {
try {
if (batchWriter != null)
batchWriter.close();
batchWriter = null;
conn.tableOperations().delete(mapping.tableName);
} catch (AccumuloException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
} catch (AccumuloSecurityException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
} catch (TableNotFoundException e) {
- return;
- }
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
}
@Override
- public boolean schemaExists() throws IOException {
+ public boolean schemaExists() {
return conn.tableOperations().exists(mapping.tableName);
}
@@ -475,7 +490,7 @@ public class AccumuloStore<K,T extends P
}
@Override
- public T get(K key, String[] fields) throws IOException {
+ public T get(K key, String[] fields) {
try {
// TODO make isolated scanner optional?
Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
@@ -490,103 +505,115 @@ public class AccumuloStore<K,T extends P
return null;
return persistent;
} catch (TableNotFoundException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ return null;
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
return null;
}
}
@Override
- public void put(K key, T val) throws IOException {
+ public void put(K key, T val) {
- Mutation m = new Mutation(new Text(toBytes(key)));
-
- Schema schema = val.getSchema();
- StateManager stateManager = val.getStateManager();
-
- Iterator<Field> iter = schema.getFields().iterator();
-
- int count = 0;
- for (int i = 0; iter.hasNext(); i++) {
- Field field = iter.next();
- if (!stateManager.isDirty(val, i)) {
- continue;
- }
+ try{
+ Mutation m = new Mutation(new Text(toBytes(key)));
- Object o = val.get(i);
- Pair<Text,Text> col = mapping.fieldMap.get(field.name());
-
- switch (field.schema().getType()) {
- case MAP:
- if (o instanceof StatefulMap) {
- StatefulMap map = (StatefulMap) o;
- Set<?> es = map.states().entrySet();
- for (Object entry : es) {
- Object mapKey = ((Entry) entry).getKey();
- State state = (State) ((Entry) entry).getValue();
-
- switch (state) {
- case NEW:
- case DIRTY:
- m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
- count++;
- break;
- case DELETED:
- m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
- count++;
- break;
+ Schema schema = val.getSchema();
+ StateManager stateManager = val.getStateManager();
+
+ Iterator<Field> iter = schema.getFields().iterator();
+
+ int count = 0;
+ for (int i = 0; iter.hasNext(); i++) {
+ Field field = iter.next();
+ if (!stateManager.isDirty(val, i)) {
+ continue;
+ }
+
+ Object o = val.get(i);
+ Pair<Text,Text> col = mapping.fieldMap.get(field.name());
+
+ switch (field.schema().getType()) {
+ case MAP:
+ if (o instanceof StatefulMap) {
+ StatefulMap map = (StatefulMap) o;
+ Set<?> es = map.states().entrySet();
+ for (Object entry : es) {
+ Object mapKey = ((Entry) entry).getKey();
+ State state = (State) ((Entry) entry).getValue();
+
+ switch (state) {
+ case NEW:
+ case DIRTY:
+ m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
+ count++;
+ break;
+ case DELETED:
+ m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
+ count++;
+ break;
+ }
+
+ }
+ } else {
+ Map map = (Map) o;
+ Set<?> es = map.entrySet();
+ for (Object entry : es) {
+ Object mapKey = ((Entry) entry).getKey();
+ Object mapVal = ((Entry) entry).getValue();
+ m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
+ count++;
}
-
}
- } else {
- Map map = (Map) o;
- Set<?> es = map.entrySet();
- for (Object entry : es) {
- Object mapKey = ((Entry) entry).getKey();
- Object mapVal = ((Entry) entry).getValue();
- m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
+ break;
+ case ARRAY:
+ GenericArray array = (GenericArray) o;
+ int j = 0;
+ for (Object item : array) {
+ m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
count++;
}
- }
- break;
- case ARRAY:
- GenericArray array = (GenericArray) o;
- int j = 0;
- for (Object item : array) {
- m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+ break;
+ case RECORD:
+ SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ BinaryEncoder encoder = new BinaryEncoder(os);
+ writer.write(o, encoder);
+ encoder.flush();
+ m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+ break;
+ default:
+ m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
count++;
- }
- break;
- case RECORD:
- SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- BinaryEncoder encoder = new BinaryEncoder(os);
- writer.write(o, encoder);
- encoder.flush();
- m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
- break;
- default:
- m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
- count++;
+ }
+
}
-
+
+ if (count > 0)
+ try {
+ getBatchWriter().addMutation(m);
+ } catch (MutationsRejectedException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
-
- if (count > 0)
- try {
- getBatchWriter().addMutation(m);
- } catch (MutationsRejectedException e) {
- throw new IOException(e);
- }
}
@Override
- public boolean delete(K key) throws IOException {
+ public boolean delete(K key) {
Query<K,T> q = newQuery();
q.setKey(key);
return deleteByQuery(q) > 0;
}
@Override
- public long deleteByQuery(Query<K,T> query) throws IOException {
+ public long deleteByQuery(Query<K,T> query) {
try {
Scanner scanner = createScanner(query);
// add iterator that drops values on the server side
@@ -613,9 +640,17 @@ public class AccumuloStore<K,T extends P
return count;
} catch (TableNotFoundException e) {
// TODO return 0?
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ return 0;
} catch (MutationsRejectedException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ return 0;
+ } catch (IOException e){
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ return 0;
}
}
@@ -654,14 +689,16 @@ public class AccumuloStore<K,T extends P
}
@Override
- public Result<K,T> execute(Query<K,T> query) throws IOException {
+ public Result<K,T> execute(Query<K,T> query) {
try {
Scanner scanner = createScanner(query);
return new AccumuloResult<K,T>(this, query, scanner);
} catch (TableNotFoundException e) {
// TODO return empty result?
- throw new IOException(e);
- }
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ return null;
+ }
}
@Override
@@ -817,26 +854,27 @@ public class AccumuloStore<K,T extends P
}
@Override
- public void flush() throws IOException {
+ public void flush() {
try {
if (batchWriter != null) {
batchWriter.flush();
}
} catch (MutationsRejectedException e) {
- throw new IOException(e);
- }
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
}
@Override
- public void close() throws IOException {
+ public void close() {
try {
if (batchWriter != null) {
batchWriter.close();
batchWriter = null;
}
} catch (MutationsRejectedException e) {
- throw new IOException(e);
- }
-
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
}
}
Modified: gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Oct 31 05:53:43 2012
@@ -78,18 +78,18 @@ public class CassandraStore<K, T extends
// this.cassandraClient.initialize();
}
- public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) throws IOException {
- super.initialize(keyClass, persistent, properties);
+ public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
try {
+ super.initialize(keyClass, persistent, properties);
this.cassandraClient.initialize(keyClass, persistent);
- }
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
}
@Override
- public void close() throws IOException {
+ public void close() {
LOG.debug("close");
flush();
}
@@ -101,25 +101,25 @@ public class CassandraStore<K, T extends
}
@Override
- public boolean delete(K key) throws IOException {
+ public boolean delete(K key) {
LOG.debug("delete " + key);
return false;
}
@Override
- public long deleteByQuery(Query<K, T> query) throws IOException {
+ public long deleteByQuery(Query<K, T> query) {
LOG.debug("delete by query " + query);
return 0;
}
@Override
- public void deleteSchema() throws IOException {
+ public void deleteSchema() {
LOG.debug("delete schema");
this.cassandraClient.dropKeyspace();
}
@Override
- public Result<K, T> execute(Query<K, T> query) throws IOException {
+ public Result<K, T> execute(Query<K, T> query) {
Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
@@ -207,7 +207,7 @@ public class CassandraStore<K, T extends
* @see org.apache.gora.store.DataStore#flush()
*/
@Override
- public void flush() throws IOException {
+ public void flush() {
Set<K> keys = this.buffer.keySet();
@@ -236,14 +236,20 @@ public class CassandraStore<K, T extends
}
@Override
- public T get(K key, String[] fields) throws IOException, Exception {
+ public T get(K key, String[] fields) {
CassandraQuery<K,T> query = new CassandraQuery<K,T>();
query.setDataStore(this);
query.setKeyRange(key, key);
query.setFields(fields);
query.setLimit(1);
Result<K,T> result = execute(query);
- boolean hasResult = result.next();
+ boolean hasResult = false;
+ try {
+ hasResult = result.next();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
return hasResult ? result.get() : null;
}
@@ -262,7 +268,7 @@ public class CassandraStore<K, T extends
*/
@Override
public String getSchemaName() {
- return this.cassandraClient.getKeyspaceName();
+ return this.cassandraClient.getKeyspaceName();
}
@Override
@@ -277,7 +283,7 @@ public class CassandraStore<K, T extends
* @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
*/
@Override
- public void put(K key, T value) throws IOException {
+ public void put(K key, T value) {
T p = (T) value.newInstance(new StateManagerImpl());
Schema schema = value.getSchema();
for (Field field: schema.getFields()) {
@@ -389,7 +395,7 @@ public class CassandraStore<K, T extends
}
@Override
- public boolean schemaExists() throws IOException {
+ public boolean schemaExists() {
LOG.info("schema exists");
return cassandraClient.keyspaceExists();
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java Wed Oct 31 05:53:43 2012
@@ -45,6 +45,9 @@ import org.apache.gora.util.OperationNot
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* An adapter DataStore for binary-compatible Avro serializations.
* AvroDataStore supports Binary and JSON serializations.
@@ -56,6 +59,8 @@ public class AvroStore<K, T extends Pers
/** The property key specifying avro encoder/decoder type to use. Can take values
* "BINARY" or "JSON". */
public static final String CODEC_TYPE_KEY = "codec.type";
+
+ public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
/**
* The type of the avro Encoder/Decoder.
@@ -76,16 +81,16 @@ public class AvroStore<K, T extends Pers
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws IOException {
- super.initialize(keyClass, persistentClass, properties);
-
- if(properties != null) {
- if(this.codecType == null) {
- String codecType = DataStoreFactory.findProperty(
- properties, this, CODEC_TYPE_KEY, "BINARY");
- this.codecType = CodecType.valueOf(codecType);
+ Properties properties) {
+ super.initialize(keyClass, persistentClass, properties);
+
+ if(properties != null) {
+ if(this.codecType == null) {
+ String codecType = DataStoreFactory.findProperty(
+ properties, this, CODEC_TYPE_KEY, "BINARY");
+ this.codecType = CodecType.valueOf(codecType);
+ }
}
- }
}
public void setCodecType(CodecType codecType) {
@@ -109,22 +114,27 @@ public class AvroStore<K, T extends Pers
}
@Override
- public void close() throws IOException {
- super.close();
- if(encoder != null) {
- encoder.flush();
+ public void close() {
+ try{
+ super.close();
+ if(encoder != null) {
+ encoder.flush();
+ }
+ encoder = null;
+ decoder = null;
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
}
- encoder = null;
- decoder = null;
}
@Override
- public boolean delete(K key) throws IOException {
+ public boolean delete(K key) {
throw new OperationNotSupportedException("delete is not supported for AvroStore");
}
@Override
- public long deleteByQuery(Query<K, T> query) throws IOException {
+ public long deleteByQuery(Query<K, T> query) {
throw new OperationNotSupportedException("delete is not supported for AvroStore");
}
@@ -148,14 +158,19 @@ public class AvroStore<K, T extends Pers
}
@Override
- public void flush() throws IOException {
- super.flush();
- if(encoder != null)
- encoder.flush();
+ public void flush() {
+ try{
+ super.flush();
+ if(encoder != null)
+ encoder.flush();
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
@Override
- public T get(K key, String[] fields) throws IOException {
+ public T get(K key, String[] fields) {
throw new OperationNotSupportedException();
}
@@ -165,8 +180,13 @@ public class AvroStore<K, T extends Pers
}
@Override
- public void put(K key, T obj) throws IOException {
- getDatumWriter().write(obj, getEncoder());
+ public void put(K key, T obj) {
+ try{
+ getDatumWriter().write(obj, getEncoder());
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
public Encoder getEncoder() throws IOException {
@@ -235,12 +255,12 @@ public class AvroStore<K, T extends Pers
}
@Override
- public void write(DataOutput out) throws IOException {
+ public void write(DataOutput out) {
super.write(out);
}
@Override
- public void readFields(DataInput in) throws IOException {
+ public void readFields(DataInput in) {
super.readFields(in);
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java Wed Oct 31 05:53:43 2012
@@ -32,6 +32,9 @@ import org.apache.gora.query.impl.FileSp
import org.apache.gora.util.OperationNotSupportedException;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* DataFileAvroStore is file based store which uses Avro's
* DataFile{Writer,Reader}'s as a backend. This datastore supports
@@ -39,20 +42,27 @@ import org.apache.hadoop.fs.Path;
*/
public class DataFileAvroStore<K, T extends PersistentBase> extends AvroStore<K, T> {
+ public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
+
public DataFileAvroStore() {
}
private DataFileWriter<T> writer;
@Override
- public T get(K key, String[] fields) throws java.io.IOException {
+ public T get(K key, String[] fields) {
throw new OperationNotSupportedException(
"Avro DataFile's does not support indexed retrieval");
};
@Override
- public void put(K key, T obj) throws java.io.IOException {
- getWriter().append(obj);
+ public void put(K key, T obj) {
+ try{
+ getWriter().append(obj);
+ } catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
};
private DataFileWriter<T> getWriter() throws IOException {
@@ -64,18 +74,29 @@ public class DataFileAvroStore<K, T exte
}
@Override
- protected Result<K, T> executeQuery(Query<K, T> query) throws IOException {
- return new DataFileAvroResult<K, T>(this, query
- , createReader(createFsInput()));
+ protected Result<K, T> executeQuery(Query<K, T> query) {
+ try{
+ return new DataFileAvroResult<K, T>(this, query
+ , createReader(createFsInput()));
+ } catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ return null;
+ }
}
@Override
- protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query)
- throws IOException {
- FsInput fsInput = createFsInput();
- DataFileReader<T> reader = createReader(fsInput);
- return new DataFileAvroResult<K, T>(this, query, reader, fsInput
- , query.getStart(), query.getLength());
+ protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query) {
+ try{
+ FsInput fsInput = createFsInput();
+ DataFileReader<T> reader = createReader(fsInput);
+ return new DataFileAvroResult<K, T>(this, query, reader, fsInput
+ , query.getStart(), query.getLength());
+ } catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ return null;
+ }
}
private DataFileReader<T> createReader(FsInput fsInput) throws IOException {
@@ -88,19 +109,29 @@ public class DataFileAvroStore<K, T exte
}
@Override
- public void flush() throws IOException {
- super.flush();
- if(writer != null) {
- writer.flush();
+ public void flush() {
+ try{
+ super.flush();
+ if(writer != null) {
+ writer.flush();
+ }
+ } catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
}
}
@Override
- public void close() throws IOException {
- if(writer != null)
- writer.close(); //hadoop 0.20.2 HDFS streams do not allow
- //to close twice, so close the writer first
- writer = null;
- super.close();
+ public void close() {
+ try{
+ if(writer != null)
+ writer.close(); //hadoop 0.20.2 HDFS streams do not allow
+ //to close twice, so close the writer first
+ writer = null;
+ super.close();
+ } catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java Wed Oct 31 05:53:43 2012
@@ -62,7 +62,8 @@ public class MemStore<K, T extends Persi
iterator = map.navigableKeySet().iterator();
}
//@Override
- public void close() throws IOException { }
+ public void close() { }
+
@Override
public float getProgress() throws IOException {
return 0;
@@ -92,12 +93,12 @@ public class MemStore<K, T extends Persi
}
@Override
- public boolean delete(K key) throws IOException {
+ public boolean delete(K key) {
return map.remove(key) != null;
}
@Override
- public long deleteByQuery(Query<K, T> query) throws IOException {
+ public long deleteByQuery(Query<K, T> query) {
try{
long deletedRows = 0;
Result<K,T> result = query.execute();
@@ -114,7 +115,7 @@ public class MemStore<K, T extends Persi
}
@Override
- public Result<K, T> execute(Query<K, T> query) throws IOException {
+ public Result<K, T> execute(Query<K, T> query) {
K startKey = query.getStartKey();
K endKey = query.getEndKey();
if(startKey == null) {
@@ -133,7 +134,7 @@ public class MemStore<K, T extends Persi
}
@Override
- public T get(K key, String[] fields) throws IOException {
+ public T get(K key, String[] fields) {
T obj = map.get(key);
return getPersistent(obj, getFieldsToQuery(fields));
}
@@ -160,7 +161,7 @@ public class MemStore<K, T extends Persi
}
@Override
- public void put(K key, T obj) throws IOException {
+ public void put(K key, T obj) {
map.put(key, obj);
}
@@ -175,23 +176,23 @@ public class MemStore<K, T extends Persi
}
@Override
- public void close() throws IOException {
+ public void close() {
map.clear();
}
@Override
- public void createSchema() throws IOException { }
+ public void createSchema() { }
@Override
- public void deleteSchema() throws IOException {
+ public void deleteSchema() {
map.clear();
}
@Override
- public boolean schemaExists() throws IOException {
+ public boolean schemaExists() {
return true;
}
@Override
- public void flush() throws IOException { }
+ public void flush() { }
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java Wed Oct 31 05:53:43 2012
@@ -22,7 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.hadoop.mapreduce.InputFormat;
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java Wed Oct 31 05:53:43 2012
@@ -36,7 +36,9 @@ import org.apache.gora.query.Result;
* <p><a name="visibility"><b>Note:</b> Results of updates ({@link #put(Object, Persistent)},
* {@link #delete(Object)} and {@link #deleteByQuery(Query)} operations) are
* guaranteed to be visible to subsequent get / execute operations ONLY
- * after a subsequent call to {@link #flush()}.
+ * after a subsequent call to {@link #flush()}. Additionally, exception
+ * handling is largely DataStore specific and is not largely dealt
+ * with from within this interface.
* @param <K> the class of keys in the datastore
* @param <T> the class of persistent objects in the datastore
*/
@@ -47,10 +49,10 @@ public interface DataStore<K, T> {
* @param keyClass the class of the keys
* @param persistentClass the class of the persistent objects
* @param properties extra metadata
- * @throws Exception
+ * @throws IOException
*/
void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws Exception;
+ Properties properties);
/**
* Sets the class of the keys
@@ -87,27 +89,31 @@ public interface DataStore<K, T> {
* to hold the objects. If the schema is already created previously,
* or the underlying data model does not support
* or need this operation, the operation is ignored.
+ * @throws IOException
*/
- void createSchema() throws Exception;
+ void createSchema();
/**
* Deletes the underlying schema or table (or similar) in the datastore
* that holds the objects. This also deletes all the data associated with
* the schema.
+ * @throws IOException
*/
- void deleteSchema() throws Exception;
+ void deleteSchema();
/**
* Deletes all the data associated with the schema, but keeps the
* schema (table or similar) intact.
+ * @throws IOException
*/
- void truncateSchema() throws Exception;
+ void truncateSchema();
/**
* Returns whether the schema that holds the data exists in the datastore.
* @return whether schema exists
+ * @throws IOException
*/
- boolean schemaExists() throws Exception;
+ boolean schemaExists();
/**
* Returns a new instance of the key object. If the object cannot be instantiated
@@ -115,59 +121,67 @@ public interface DataStore<K, T> {
* constructor) it throws an exception. Only use this function if you can
* make sure that the key class has a no-arg constructor.
* @return a new instance of the key object.
+ * @throws IOException
*/
- K newKey() throws Exception;
+ K newKey();
/**
* Returns a new instance of the managed persistent object.
* @return a new instance of the managed persistent object.
+ * @throws IOException
*/
- T newPersistent() throws Exception;
+ T newPersistent();
/**
* Returns the object corresponding to the given key fetching all the fields.
* @param key the key of the object
* @return the Object corresponding to the key or null if it cannot be found
+ * @throws IOException
*/
- T get(K key) throws Exception;
+ T get(K key);
/**
* Returns the object corresponding to the given key.
* @param key the key of the object
* @param fields the fields required in the object. Pass null, to retrieve all fields
* @return the Object corresponding to the key or null if it cannot be found
+ * @throws IOException
*/
- T get(K key, String[] fields) throws Exception;
+ T get(K key, String[] fields);
/**
* Inserts the persistent object with the given key. If an
* object with the same key already exists it will silently
* be replaced. See also the note on
* <a href="#visibility">visibility</a>.
+ * @throws IOException
*/
- void put(K key, T obj) throws Exception;
+ void put(K key, T obj);
/**
* Deletes the object with the given key
* @param key the key of the object
* @return whether the object was successfully deleted
+ * @throws IOException
*/
- boolean delete(K key) throws Exception;
+ boolean delete(K key);
/**
* Deletes all the objects matching the query.
* See also the note on <a href="#visibility">visibility</a>.
* @param query matching records to this query will be deleted
* @return number of deleted records
+ * @throws IOException
*/
- long deleteByQuery(Query<K, T> query) throws Exception;
+ long deleteByQuery(Query<K, T> query);
/**
* Executes the given query and returns the results.
* @param query the query to execute.
* @return the results as a {@link Result} object.
+ * @throws IOException
*/
- Result<K,T> execute(Query<K, T> query) throws Exception;
+ Result<K,T> execute(Query<K, T> query);
/**
* Constructs and returns a new Query.
@@ -191,8 +205,9 @@ public interface DataStore<K, T> {
* optimize their writing by deferring the actual put / delete operations
* until this moment.
* See also the note on <a href="#visibility">visibility</a>.
+ * @throws IOException
*/
- void flush() throws Exception;
+ void flush();
/**
* Sets the {@link BeanFactory} to use by the DataStore.
@@ -211,11 +226,8 @@ public interface DataStore<K, T> {
* implementation, so that the instance is ready for GC.
* All other DataStore methods cannot be used after this
* method was called. Subsequent calls of this method are ignored.
+ * @throws IOException
*/
- void close() throws IOException, InterruptedException, Exception;
-
- //void readFields(Object in) throws Exception;
-
- //void write() throws Exception;
+ void close();
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java Wed Oct 31 05:53:43 2012
@@ -98,7 +98,7 @@ public class DataStoreFactory{
private static <K, T extends Persistent> void initializeDataStore(
DataStore<K, T> dataStore, Class<K> keyClass, Class<T> persistent,
- Properties properties) throws IOException, Exception {
+ Properties properties) throws IOException {
dataStore.initialize(keyClass, persistent, properties);
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java Wed Oct 31 05:53:43 2012
@@ -30,6 +30,7 @@ import org.apache.avro.Schema.Field;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.gora.avro.PersistentDatumReader;
import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.gora.avro.store.AvroStore;
import org.apache.gora.persistency.BeanFactory;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.persistency.impl.PersistentBase;
@@ -41,9 +42,10 @@ import org.apache.gora.util.StringUtils;
import org.apache.gora.util.WritableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Base class for Avro persistent {@link DataStore}s.
@@ -71,25 +73,27 @@ implements DataStore<K, T>, Configurable
protected PersistentDatumReader<T> datumReader;
protected PersistentDatumWriter<T> datumWriter;
+
+ public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
public DataStoreBase() {
}
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws IOException {
- setKeyClass(keyClass);
- setPersistentClass(persistentClass);
- if(this.beanFactory == null)
- this.beanFactory = new BeanFactoryImpl<K, T>(keyClass, persistentClass);
- schema = this.beanFactory.getCachedPersistent().getSchema();
- fieldMap = AvroUtils.getFieldMap(schema);
-
- autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this);
- this.properties = properties;
-
- datumReader = new PersistentDatumReader<T>(schema, false);
- datumWriter = new PersistentDatumWriter<T>(schema, false);
+ Properties properties) {
+ setKeyClass(keyClass);
+ setPersistentClass(persistentClass);
+ if(this.beanFactory == null)
+ this.beanFactory = new BeanFactoryImpl<K, T>(keyClass, persistentClass);
+ schema = this.beanFactory.getCachedPersistent().getSchema();
+ fieldMap = AvroUtils.getFieldMap(schema);
+
+ autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this);
+ this.properties = properties;
+
+ datumReader = new PersistentDatumReader<T>(schema, false);
+ datumWriter = new PersistentDatumWriter<T>(schema, false);
}
@Override
@@ -114,20 +118,24 @@ implements DataStore<K, T>, Configurable
}
@Override
- public K newKey() throws IOException {
+ public K newKey() {
try {
return beanFactory.newKey();
} catch (Exception ex) {
- throw new IOException(ex);
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ return null;
}
}
@Override
- public T newPersistent() throws IOException {
+ public T newPersistent() {
try {
return beanFactory.newPersistent();
} catch (Exception ex) {
- throw new IOException(ex);
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ return null;
}
}
@@ -142,7 +150,7 @@ implements DataStore<K, T>, Configurable
}
@Override
- public T get(K key) throws IOException, Exception {
+ public T get(K key) {
return get(key, getFieldsToQuery(null));
};
@@ -176,21 +184,30 @@ implements DataStore<K, T>, Configurable
}
@SuppressWarnings("unchecked")
- public void readFields(DataInput in) throws IOException {
+ public void readFields(DataInput in) {
try {
Class<K> keyClass = (Class<K>) ClassLoadingUtils.loadClass(Text.readString(in));
Class<T> persistentClass = (Class<T>)ClassLoadingUtils.loadClass(Text.readString(in));
Properties props = WritableUtils.readProperties(in);
initialize(keyClass, persistentClass, props);
} catch (ClassNotFoundException ex) {
- throw new IOException(ex);
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
}
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, getKeyClass().getCanonicalName());
- Text.writeString(out, getPersistentClass().getCanonicalName());
- WritableUtils.writeProperties(out, properties);
+ public void write(DataOutput out) {
+ try {
+ Text.writeString(out, getKeyClass().getCanonicalName());
+ Text.writeString(out, getPersistentClass().getCanonicalName());
+ WritableUtils.writeProperties(out, properties);
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
+ }
}
@Override
@@ -208,7 +225,7 @@ implements DataStore<K, T>, Configurable
@Override
/** Default implementation deletes and recreates the schema*/
- public void truncateSchema() throws IOException, Exception {
+ public void truncateSchema() {
deleteSchema();
createSchema();
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java Wed Oct 31 05:53:43 2012
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.apache.gora.avro.store.AvroStore;
import org.apache.gora.mapreduce.GoraMapReduceUtils;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
@@ -42,6 +43,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base implementations for {@link FileBackedDataStore} methods.
@@ -56,10 +59,12 @@ public abstract class FileBackedDataStor
protected InputStream inputStream;
protected OutputStream outputStream;
+
+ public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws IOException {
+ Properties properties) {
super.initialize(keyClass, persistentClass, properties);
if(properties != null) {
if(this.inputPath == null) {
@@ -122,17 +127,28 @@ public InputStream getInputStream() {
}
/** Opens an OutputStream for the output Hadoop path */
- protected OutputStream createOutputStream() throws IOException {
- Path path = new Path(outputPath);
- FileSystem fs = path.getFileSystem(getConf());
- return fs.create(path);
+ protected OutputStream createOutputStream() {
+ OutputStream conf = null;
+ try{
+ Path path = new Path(outputPath);
+ FileSystem fs = path.getFileSystem(getConf());
+ conf = fs.create(path);
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
+ return conf;
}
protected InputStream getOrCreateInputStream() throws IOException {
- if(inputStream == null) {
- inputStream = createInputStream();
+ try{
+ if(inputStream == null) {
+ inputStream = createInputStream();
+ }
+ return inputStream;
+ }catch(IOException ex){
+ throw new IOException(ex);
}
- return inputStream;
}
protected OutputStream getOrCreateOutputStream() throws IOException {
@@ -143,25 +159,37 @@ public InputStream getInputStream() {
}
@Override
- public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
- throws IOException {
- List<InputSplit> splits = GoraMapReduceUtils.getSplits(getConf(), inputPath);
- List<PartitionQuery<K, T>> queries = new ArrayList<PartitionQuery<K,T>>(splits.size());
-
- for(InputSplit split : splits) {
- queries.add(new FileSplitPartitionQuery<K, T>(query, (FileSplit) split));
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query){
+ List<InputSplit> splits = null;
+ List<PartitionQuery<K, T>> queries = null;
+ try{
+ splits = GoraMapReduceUtils.getSplits(getConf(), inputPath);
+ queries = new ArrayList<PartitionQuery<K,T>>(splits.size());
+
+ for(InputSplit split : splits) {
+ queries.add(new FileSplitPartitionQuery<K, T>(query, (FileSplit) split));
+ }
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
}
-
return queries;
}
@Override
- public Result<K, T> execute(Query<K, T> query) throws IOException {
- if(query instanceof FileSplitPartitionQuery) {
- return executePartial((FileSplitPartitionQuery<K, T>) query);
- } else {
- return executeQuery(query);
+ public Result<K, T> execute(Query<K, T> query) {
+ Result<K, T> results = null;
+ try{
+ if(query instanceof FileSplitPartitionQuery) {
+ results = executePartial((FileSplitPartitionQuery<K, T>) query);
+ } else {
+ results = executeQuery(query);
+ }
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
}
+ return results;
}
/**
@@ -178,51 +206,66 @@ public InputStream getInputStream() {
throws IOException;
@Override
- public void flush() throws IOException {
- if(outputStream != null)
- outputStream.flush();
+ public void flush() {
+ try{
+ if(outputStream != null)
+ outputStream.flush();
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
@Override
- public void createSchema() throws IOException {
+ public void createSchema() {
}
@Override
- public void deleteSchema() throws IOException {
+ public void deleteSchema() {
throw new OperationNotSupportedException("delete schema is not supported for " +
"file backed data stores");
}
@Override
- public boolean schemaExists() throws IOException {
+ public boolean schemaExists() {
return true;
}
@Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- org.apache.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath);
- if(inputPath != null)
- Text.writeString(out, inputPath);
- if(outputPath != null)
- Text.writeString(out, outputPath);
+ public void write(DataOutput out) {
+ try{
+ super.write(out);
+ org.apache.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath);
+ if(inputPath != null)
+ Text.writeString(out, inputPath);
+ if(outputPath != null)
+ Text.writeString(out, outputPath);
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
@Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- boolean[] nullFields = org.apache.gora.util.IOUtils.readNullFieldsInfo(in);
- if(!nullFields[0])
- inputPath = Text.readString(in);
- if(!nullFields[1])
- outputPath = Text.readString(in);
+ public void readFields(DataInput in) {
+ try{
+ super.readFields(in);
+ boolean[] nullFields = org.apache.gora.util.IOUtils.readNullFieldsInfo(in);
+ if(!nullFields[0])
+ inputPath = Text.readString(in);
+ if(!nullFields[1])
+ outputPath = Text.readString(in);
+ }catch(IOException ex){
+ LOG.error(ex.getMessage());
+ LOG.error(ex.getStackTrace().toString());
+ }
}
@Override
- public void close() throws IOException {
- IOUtils.closeStream(inputStream);
- IOUtils.closeStream(outputStream);
- inputStream = null;
- outputStream = null;
+ public void close() {
+ IOUtils.closeStream(inputStream);
+ IOUtils.closeStream(outputStream);
+ inputStream = null;
+ outputStream = null;
}
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java Wed Oct 31 05:53:43 2012
@@ -38,9 +38,10 @@ public abstract class WSBackedDataStoreB
@Override
/**
* Initializes a web service backed data store
+ * @throws IOException
*/
public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws Exception {
+ Properties properties) {
super.initialize(keyClass, persistentClass, properties);
}
@@ -48,9 +49,13 @@ public abstract class WSBackedDataStoreB
/**
* Executes a query inside a web service backed data store
*/
- public Result<K, T> execute(Query<K, T> query) throws Exception {
- // TODO We could have different types of execution {@link FileBackedDataStoreBase}
- return executeQuery(query);
+ public Result<K, T> execute(Query<K, T> query) {
+ try {
+ return executeQuery(query);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
}
/**
@@ -58,27 +63,27 @@ public abstract class WSBackedDataStoreB
* for non-PartitionQuery's.
*/
protected abstract Result<K,T> executeQuery(Query<K,T> query)
- throws Exception;
+ throws IOException;
@Override
/**
* Flushes objects into the data store
*/
- public void flush() throws Exception {
+ public void flush() {
}
@Override
/**
* Creates schema into the data store
*/
- public void createSchema() throws Exception{
+ public void createSchema() {
}
@Override
/**
* Deletes schema from the data store
*/
- public void deleteSchema() throws Exception {
+ public void deleteSchema() {
throw new OperationNotSupportedException("delete schema is not supported for " +
"file backed data stores");
}
@@ -87,7 +92,7 @@ public abstract class WSBackedDataStoreB
/**
* Verifies if a schema exists
*/
- public boolean schemaExists() throws Exception {
+ public boolean schemaExists() {
return true;
}
@@ -111,6 +116,6 @@ public abstract class WSBackedDataStoreB
/**
* Closes the data store
*/
- public void close() throws IOException, InterruptedException, Exception {
+ public void close() {
}
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java Wed Oct 31 05:53:43 2012
@@ -20,6 +20,7 @@
*/
package org.apache.gora.store.ws.impl;
+import java.io.IOException;
import java.util.Properties;
import org.apache.gora.persistency.Persistent;
@@ -68,7 +69,7 @@ implements DataStore<K, T>{
* Initializes the web services backed data store
*/
public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws Exception {
+ Properties properties) {
setKeyClass(keyClass);
setPersistentClass(persistentClass);
}
@@ -149,7 +150,7 @@ implements DataStore<K, T>{
@Override
/** Default implementation deletes and recreates the schema*/
- public void truncateSchema() throws Exception {
+ public void truncateSchema() {
deleteSchema();
createSchema();
}
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java Wed Oct 31 05:53:43 2012
@@ -145,6 +145,7 @@ public class WSDataStoreFactory{
* @return A new store instance.
* @throws GoraException
*/
+ @SuppressWarnings("unchecked")
public static <D extends DataStore<K,T>, K, T extends Persistent>
D createDataStore(Class<D> dataStoreClass, Class<K> keyClass
, Class<T> persistent, Object auth, Properties properties, String schemaName)
Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java (original)
+++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java Wed Oct 31 05:53:43 2012
@@ -42,7 +42,6 @@ import org.apache.avro.ipc.ByteBufferInp
import org.apache.avro.ipc.ByteBufferOutputStream;
import org.apache.gora.avro.PersistentDatumReader;
import org.apache.gora.avro.PersistentDatumWriter;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
Modified: gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java (original)
+++ gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java Wed Oct 31 05:53:43 2012
@@ -71,11 +71,8 @@ public class GoraTestDriver {
*/
public void setUp() throws Exception {
log.info("setting up test");
- try {
- for(DataStore store : dataStores) {
- store.truncateSchema();
- }
- }catch (IOException ignore) {
+ for(DataStore store : dataStores) {
+ store.truncateSchema();
}
}
Modified: gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java (original)
+++ gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java Wed Oct 31 05:53:43 2012
@@ -57,49 +57,47 @@ public class MockDataStore extends DataS
}
@Override
- public void close() throws IOException {
+ public void close() {
}
@Override
- public void createSchema() throws IOException {
+ public void createSchema() {
}
@Override
- public void deleteSchema() throws IOException {
+ public void deleteSchema() {
}
@Override
- public void truncateSchema() throws IOException {
+ public void truncateSchema() {
}
@Override
- public boolean schemaExists() throws IOException {
+ public boolean schemaExists() {
return true;
}
@Override
- public boolean delete(String key) throws IOException {
+ public boolean delete(String key) {
return false;
}
@Override
- public long deleteByQuery(Query<String, MockPersistent> query)
- throws IOException {
+ public long deleteByQuery(Query<String, MockPersistent> query) {
return 0;
}
@Override
- public Result<String, MockPersistent> execute(
- Query<String, MockPersistent> query) throws IOException {
+ public Result<String, MockPersistent> execute(Query<String, MockPersistent> query) {
return null;
}
@Override
- public void flush() throws IOException {
+ public void flush() {
}
@Override
- public MockPersistent get(String key, String[] fields) throws IOException {
+ public MockPersistent get(String key, String[] fields) {
return null;
}
@@ -133,7 +131,7 @@ public class MockDataStore extends DataS
}
@Override
- public void put(String key, MockPersistent obj) throws IOException {
+ public void put(String key, MockPersistent obj) {
}
@Override
Modified: gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original)
+++ gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Oct 31 05:53:43 2012
@@ -139,8 +139,8 @@ public class DynamoDBQuery<K, T extends
Condition newCondition = buildRangeCondition();
buildQueryExpression(newCondition, hashAttrValue);
}
- if (DynamoDBQuery.getType().equals(SCAN_QUERY))
- buildScanExpression(hashAttrValue);
+ if (DynamoDBQuery.getType().equals(SCAN_QUERY))
+ buildScanExpression(hashAttrValue);
}
/**
Modified: gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff
==============================================================================
--- gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java (original)
+++ gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java Wed Oct 31 05:53:43 2012
@@ -158,7 +158,7 @@ public class DynamoDBStore<K, T extends
* setting the client's properties up, setting the end point and reading the mapping file
*/
public void initialize(Class<K> keyClass, Class<T> pPersistentClass,
- Properties properties) throws Exception {
+ Properties properties) {
try {
LOG.debug("Initializing DynamoDB store");
getCredentials();
@@ -172,7 +172,8 @@ public class DynamoDBStore<K, T extends
}
catch (Exception e) {
LOG.error("Error while initializing DynamoDB store");
- throw new IOException(e.getMessage(), e);
+ LOG.error(e.getMessage());
+ LOG.error(e.getStackTrace().toString());
}
}
@@ -209,7 +210,7 @@ public class DynamoDBStore<K, T extends
List<Element> tableElements = root.getChildren("table");
for(Element tableElement : tableElements) {
-
+
String tableName = tableElement.getAttributeValue("name");
long readCapacUnits = Long.parseLong(tableElement.getAttributeValue("readcunit"));
long writeCapacUnits = Long.parseLong(tableElement.getAttributeValue("readcunit"));
@@ -267,8 +268,8 @@ public class DynamoDBStore<K, T extends
if(authentication == null){
InputStream awsCredInpStr = getClass().getClassLoader().getResourceAsStream(awsCredentialsProperties);
- if (awsCredInpStr == null)
- LOG.info("AWS Credentials File was not found on the classpath!");
+ if (awsCredInpStr == null)
+ LOG.info("AWS Credentials File was not found on the classpath!");
AWSCredentials credentials = new PropertiesCredentials(awsCredInpStr);
setConf(credentials);
}
@@ -307,7 +308,7 @@ public class DynamoDBStore<K, T extends
* Executes a query after building a DynamoDB specific query based on the received one
*/
@Override
- public Result<K, T> execute(Query<K, T> query) throws Exception {
+ public Result<K, T> execute(Query<K, T> query) {
DynamoDBQuery<K, T> dynamoDBQuery = buildDynamoDBQuery(query);
DynamoDBMapper mapper = new DynamoDBMapper(dynamoDBClient);
List<T> objList = null;
@@ -319,7 +320,7 @@ public class DynamoDBStore<K, T extends
}
@Override
- public T get(K key, String[] fields) throws Exception {
+ public T get(K key, String[] fields) {
/* DynamoDBQuery<K,T> query = new DynamoDBQuery<K,T>();
query.setDataStore(this);
//query.setKeyRange(key, key);
@@ -331,22 +332,36 @@ public class DynamoDBStore<K, T extends
return null;
}
+ @Override
/**
* Gets the object with the specific key
+ * @throws IOException
*/
- public T get(K key) throws Exception {
+ public T get(K key) {
T object = null;
- Object rangeKey = getRangeKey(key);
- Object hashKey = getHashKey(key);
- if (hashKey != null){
- DynamoDBMapper mapper = new DynamoDBMapper(dynamoDBClient);
- if (rangeKey != null)
+ try {
+ Object rangeKey;
+ rangeKey = getRangeKey(key);
+ Object hashKey = getHashKey(key);
+ if (hashKey != null){
+ DynamoDBMapper mapper = new DynamoDBMapper(dynamoDBClient);
+ if (rangeKey != null)
object = mapper.load(persistentClass, hashKey, rangeKey);
else
object = mapper.load(persistentClass, hashKey);
+ }
+ else
+ throw new GoraException("Error while retrieving keys from object: " + key.toString());
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } catch (GoraException ge){
+ LOG.error(ge.getMessage());
+ LOG.error(ge.getStackTrace().toString());
}
- else
- throw new GoraException("Error while retrieving keys from object: " + key.toString());
return object;
}
@@ -379,8 +394,10 @@ public class DynamoDBStore<K, T extends
/**
* Creates the table within the data store for a preferred schema or
* for a group of schemas defined withing the mapping file
+ * @throws IOException
*/
- public void createSchema() throws Exception {
+ @Override
+ public void createSchema() {
LOG.info("Creating schema");
if (mapping.getTables().isEmpty()) throw new IllegalStateException("There are not tables defined.");
if (preferredSchema == null){
@@ -428,8 +445,10 @@ public class DynamoDBStore<K, T extends
/**
* Deletes all tables present in the mapping object.
+ * @throws IOException
*/
- public void deleteSchema() throws Exception {
+ @Override
+ public void deleteSchema() {
if (mapping.getTables().isEmpty()) throw new IllegalStateException("There are not tables defined.");
if (preferredSchema == null){
LOG.debug("Delete schemas");
@@ -511,8 +530,10 @@ public class DynamoDBStore<K, T extends
/**
* Verifies if the specified schemas exist
+ * @throws IOException
*/
- public boolean schemaExists() throws Exception {
+ @Override
+ public boolean schemaExists() {
LOG.info("Verifying schemas.");
TableDescription success = null;
if (mapping.getTables().isEmpty()) throw new IllegalStateException("There are not tables defined.");
@@ -550,24 +571,41 @@ public class DynamoDBStore<K, T extends
}
return tableDescription;
}
-
- public K newKey() throws Exception {
+ /**
+ * Returns a new instance of the key object.
+ * @throws IOException
+ */
+ @Override
+ public K newKey() {
// TODO Auto-generated method stub
return null;
}
/**
* Returns a new persistent object
+ * @throws IOException
*/
- public T newPersistent() throws Exception {
- T obj = persistentClass.newInstance();
+ @Override
+ public T newPersistent() {
+ T obj = null;
+ try {
+ obj = persistentClass.newInstance();
+ } catch (InstantiationException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
return obj;
}
/**
* Puts an object identified by a key
+ * @throws IOException
*/
- public void put(K key, T obj) throws Exception {
+ @Override
+ public void put(K key, T obj) {
try{
Object rangeKey = getRangeKey(key);
Object hashKey = getHashKey(key);
@@ -589,14 +627,19 @@ public class DynamoDBStore<K, T extends
}catch(NullPointerException npe){
LOG.error("Error while putting an item. " + npe.toString());
npe.printStackTrace();
+ }catch(Exception e){
+ LOG.error("Error while putting an item. " + obj.toString());
+ e.printStackTrace();
}
}
/**
* Deletes the object using key
* @return true for a successful process
+ * @throws IOException
*/
- public boolean delete(K key) throws Exception {
+ @Override
+ public boolean delete(K key) {
try{
T object = null;
Object rangeKey = null, hashKey = null;
@@ -635,9 +678,11 @@ public class DynamoDBStore<K, T extends
/**
* Deletes items using a specific query
+ * @throws IOException
*/
+ @Override
@SuppressWarnings("unchecked")
- public long deleteByQuery(Query<K, T> query) throws Exception {
+ public long deleteByQuery(Query<K, T> query) {
// TODO verify whether or not we are deleting a whole row
//String[] fields = getFieldsToQuery(query.getFields());
//find whether all fields are queried, which means that complete
@@ -646,15 +691,27 @@ public class DynamoDBStore<K, T extends
// , getBeanFactory().getCachedPersistent().getFields());
Result<K, T> result = execute(query);
ArrayList<T> deletes = new ArrayList<T>();
- while(result.next()) {
- T resultObj = result.get();
- deletes.add(resultObj);
-
- @SuppressWarnings("rawtypes")
- DynamoDBKey dKey = new DynamoDBKey();
- dKey.setHashKey(getHashKey(resultObj));
- dKey.setRangeKey(getRangeKey(resultObj));
- delete((K)dKey);
+ try {
+ while(result.next()) {
+ T resultObj = result.get();
+ deletes.add(resultObj);
+
+ @SuppressWarnings("rawtypes")
+ DynamoDBKey dKey = new DynamoDBKey();
+
+ dKey.setHashKey(getHashKey(resultObj));
+
+ dKey.setRangeKey(getRangeKey(resultObj));
+ delete((K)dKey);
+ }
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
}
return deletes.size();
}
@@ -743,8 +800,12 @@ public class DynamoDBStore<K, T extends
// TODO Auto-generated method stub
return null;
}
-
- public void flush() throws Exception {
+ @Override
+ /**
+ * flushes objects to DynamoDB
+ * @throws IOException
+ */
+ public void flush() {
// TODO Auto-generated method stub
}
@@ -757,10 +818,11 @@ public class DynamoDBStore<K, T extends
return null;
}
+ @Override
/**
* Closes the data store.
*/
- public void close() throws IOException, InterruptedException, Exception {
+ public void close() {
LOG.debug("Datastore closed.");
flush();
}